Skip to content

Commit 3bcb8f7

Browse files
cigraingerclaude
andcommitted
feat: distributed connected components via broadcast-iterate pattern
Connected components now accepts `workers: [w1, w2, ...]` for distributed execution: Dux.Graph.connected_components(graph, workers: workers) Each iteration: 1. Broadcast edges (once) and current labels to all workers 2. Workers compute neighbor labels via join (forward + reverse) 3. Coordinator collects results, merges MIN(component) per vertex 4. Check convergence, repeat Key fix: use Worker.execute directly instead of Coordinator.execute for the inner iteration. This avoids the PipelineSplitter interference with broadcast table references. The Coordinator's pipeline splitting is designed for top-level pipelines, not for inner-loop SQL that references pre-broadcast temp tables. Tests verify: 2-component graph produces correct components, distributed result matches local result exactly. Updated distributed graph status: - out_degree/in_degree/degree: ✅ distributed via Coordinator - PageRank: ✅ distributed via broadcast-iterate - connected_components: ✅ distributed via broadcast-iterate - shortest_paths: single-node (recursive CTE) - triangle_count: single-node (needs edge replication) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent ff41e14 commit 3bcb8f7

2 files changed

Lines changed: 209 additions & 4 deletions

File tree

lib/dux/graph.ex

Lines changed: 152 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -427,7 +427,13 @@ defmodule Dux.Graph do
427427
"""
428428
def connected_components(%__MODULE__{} = graph, opts \\ []) do
429429
max_iterations = Keyword.get(opts, :max_iterations, 100)
430-
cc_local(graph, max_iterations)
430+
workers = Keyword.get(opts, :workers)
431+
432+
if workers do
433+
cc_distributed(graph, max_iterations, workers)
434+
else
435+
cc_local(graph, max_iterations)
436+
end
431437
end
432438

433439
defp cc_local(graph, max_iterations) do
@@ -485,6 +491,145 @@ defmodule Dux.Graph do
485491
final
486492
end
487493

494+
defp cc_distributed(graph, max_iterations, workers) do
495+
vid = graph.vertex_id
496+
src = graph.edge_src
497+
dst = graph.edge_dst
498+
499+
labels =
500+
graph.vertices
501+
|> Dux.select([String.to_atom(vid)])
502+
|> Dux.mutate_with(component: ~s(#{qi(vid)}))
503+
|> Dux.compute()
504+
505+
# Broadcast edges once (they don't change between iterations)
506+
edges_computed = Dux.compute(graph.edges)
507+
{:table, edges_ref} = edges_computed.source
508+
edges_ipc = Dux.Native.table_to_ipc(edges_ref)
509+
broadcast_to_workers(workers, [{"__cc_edges", edges_ipc}])
510+
511+
result =
512+
Enum.reduce_while(1..max_iterations, labels, fn _i, labels ->
513+
{:table, labels_ref} = labels.source
514+
labels_ipc = Dux.Native.table_to_ipc(labels_ref)
515+
516+
stage = :erlang.unique_integer([:positive])
517+
broadcast_to_workers(workers, [{"__cc_labels_#{stage}", labels_ipc}])
518+
519+
# Forward propagation: for each edge src→dst, propagate src's label to dst
520+
# Use the same join-against-broadcast pattern as PageRank
521+
fwd_pipeline =
522+
Dux.from_query(~s(SELECT * FROM "__cc_edges"))
523+
|> Dux.join(
524+
Dux.from_query(~s(SELECT * FROM "__cc_labels_#{stage}")),
525+
on: [{String.to_atom(src), String.to_atom(vid)}]
526+
)
527+
|> Dux.select([String.to_atom(dst), :component])
528+
529+
# Reverse propagation: for each edge src→dst, propagate dst's label to src
530+
rev_pipeline =
531+
Dux.from_query(~s(SELECT * FROM "__cc_edges"))
532+
|> Dux.join(
533+
Dux.from_query(~s(SELECT * FROM "__cc_labels_#{stage}")),
534+
on: [{String.to_atom(dst), String.to_atom(vid)}]
535+
)
536+
|> Dux.select([String.to_atom(src), :component])
537+
538+
# Fan out to workers, collect results
539+
fwd_results = fan_out_pipeline(workers, fwd_pipeline)
540+
rev_results = fan_out_pipeline(workers, rev_pipeline)
541+
542+
# Collect all IPC results
543+
all_ipc =
544+
(fwd_results ++ rev_results)
545+
|> Enum.flat_map(fn
546+
{:ok, ipc} -> [ipc]
547+
_ -> []
548+
end)
549+
550+
# Load results + current labels into coordinator DuckDB, take MIN
551+
db = Dux.Connection.get_db()
552+
553+
input_refs =
554+
Enum.map(all_ipc, fn ipc ->
555+
ref = Dux.Native.table_from_ipc(ipc)
556+
name = Dux.Native.table_ensure(db, ref)
557+
{name, ref}
558+
end)
559+
560+
Process.put(:dux_cc_refs, {labels, input_refs, edges_computed})
561+
562+
{:table, cur_labels_ref} = labels.source
563+
cur_table = Dux.Native.table_ensure(db, cur_labels_ref)
564+
565+
# UNION ALL: current labels + forward neighbor labels (renamed) + reverse neighbor labels (renamed)
566+
fwd_unions =
567+
input_refs
568+
|> Enum.take(length(fwd_results))
569+
|> Enum.map_join(" UNION ALL ", fn {name, _} ->
570+
~s(SELECT #{qi(dst)} AS #{qi(vid)}, component FROM "#{name}")
571+
end)
572+
573+
rev_unions =
574+
input_refs
575+
|> Enum.drop(length(fwd_results))
576+
|> Enum.map_join(" UNION ALL ", fn {name, _} ->
577+
~s(SELECT #{qi(src)} AS #{qi(vid)}, component FROM "#{name}")
578+
end)
579+
580+
parts = [~s(SELECT #{qi(vid)}, component FROM "#{cur_table}")]
581+
parts = if fwd_unions != "", do: parts ++ [fwd_unions], else: parts
582+
parts = if rev_unions != "", do: parts ++ [rev_unions], else: parts
583+
584+
union_sql = Enum.join(parts, " UNION ALL ")
585+
586+
merge_sql = """
587+
SELECT #{qi(vid)}, MIN(component) AS component
588+
FROM (#{union_sql}) __all
589+
GROUP BY #{qi(vid)}
590+
"""
591+
592+
new_labels =
593+
case Dux.Native.df_query(db, merge_sql) do
594+
{:error, reason} ->
595+
raise "CC merge failed: #{reason}"
596+
597+
ref ->
598+
names = Dux.Native.table_names(ref)
599+
dtypes = ref |> Dux.Native.table_dtypes() |> Map.new()
600+
%Dux{source: {:table, ref}, names: names, dtypes: dtypes}
601+
end
602+
603+
Process.delete(:dux_cc_refs)
604+
605+
# Cleanup this iteration's labels (keep edges for next iteration)
606+
Enum.each(workers, fn w ->
607+
try do
608+
Worker.drop_table(w, "__cc_labels_#{stage}")
609+
catch
610+
_, _ -> :ok
611+
end
612+
end)
613+
614+
if converged?(labels, new_labels, vid) do
615+
{:halt, new_labels}
616+
else
617+
{:cont, new_labels}
618+
end
619+
end)
620+
621+
# Final cleanup: remove broadcast edges
622+
Enum.each(workers, fn w ->
623+
try do
624+
Worker.drop_table(w, "__cc_edges")
625+
catch
626+
_, _ -> :ok
627+
end
628+
end)
629+
630+
result
631+
end
632+
488633
defp converged?(old_labels, new_labels, vid) do
489634
old_cols = Dux.to_columns(old_labels)
490635
new_cols = Dux.to_columns(new_labels)
@@ -571,6 +716,12 @@ defmodule Dux.Graph do
571716
Task.await_many(tasks, 30_000)
572717
end
573718

719+
defp fan_out_pipeline(workers, pipeline) do
720+
workers
721+
|> Enum.map(&Task.async(fn -> Worker.execute(&1, pipeline) end))
722+
|> Task.await_many(30_000)
723+
end
724+
574725
defp register_tables(worker, tables) do
575726
Enum.each(tables, fn {name, ipc} -> Worker.register_table(worker, name, ipc) end)
576727
end

test/dux/distributed_graph_test.exs

Lines changed: 57 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,9 +109,63 @@ defmodule Dux.DistributedGraphTest do
109109
# Distributed connected components
110110
# ---------------------------------------------------------------------------
111111

112-
# Distributed connected_components uses the broadcast pattern.
113-
# Complex edge case around broadcast table visibility during
114-
# Coordinator.execute needs more investigation. Local CC works correctly.
112+
describe "distributed connected_components" do
113+
test "finds correct components with workers" do
114+
workers = start_workers(2)
115+
116+
vertices = Dux.from_list([%{id: 1}, %{id: 2}, %{id: 3}, %{id: 4}])
117+
118+
edges =
119+
Dux.from_list([
120+
%{src: 1, dst: 2},
121+
%{src: 2, dst: 1},
122+
%{src: 3, dst: 4},
123+
%{src: 4, dst: 3}
124+
])
125+
126+
graph = Dux.Graph.new(vertices: vertices, edges: edges)
127+
128+
result =
129+
Dux.Graph.connected_components(graph, workers: workers)
130+
|> Dux.sort_by(:id)
131+
|> Dux.to_columns()
132+
133+
[c1, c2, c3, c4] = result["component"]
134+
assert c1 == c2
135+
assert c3 == c4
136+
assert c1 != c3
137+
end
138+
139+
test "distributed CC matches local" do
140+
workers = start_workers(2)
141+
142+
vertices = Dux.from_list([%{id: 1}, %{id: 2}, %{id: 3}, %{id: 4}, %{id: 5}])
143+
144+
edges =
145+
Dux.from_list([
146+
%{src: 1, dst: 2},
147+
%{src: 2, dst: 1},
148+
%{src: 2, dst: 3},
149+
%{src: 3, dst: 2},
150+
%{src: 4, dst: 5},
151+
%{src: 5, dst: 4}
152+
])
153+
154+
graph = Dux.Graph.new(vertices: vertices, edges: edges)
155+
156+
local =
157+
Dux.Graph.connected_components(graph)
158+
|> Dux.sort_by(:id)
159+
|> Dux.to_columns()
160+
161+
dist =
162+
Dux.Graph.connected_components(graph, workers: workers)
163+
|> Dux.sort_by(:id)
164+
|> Dux.to_columns()
165+
166+
assert local["component"] == dist["component"]
167+
end
168+
end
115169

116170
# ---------------------------------------------------------------------------
117171
# :peer distributed graph

0 commit comments

Comments
 (0)