Skip to content

Commit 85abdcb

Browse files
cigraingerclaude
andcommitted
chore: fix all credo issues — alias, nesting, TODO tag
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 4677ed3 commit 85abdcb

1 file changed

Lines changed: 20 additions & 14 deletions

File tree

lib/dux/graph.ex

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
defmodule Dux.Graph do
2+
alias Dux.Remote.{Coordinator, Worker}
3+
24
@moduledoc """
35
Graph analytics on Dux dataframes.
46
@@ -302,15 +304,10 @@ defmodule Dux.Graph do
302304

303305
stage = :erlang.unique_integer([:positive])
304306

305-
tasks =
306-
Enum.map(workers, fn w ->
307-
Task.async(fn ->
308-
Dux.Remote.Worker.register_table(w, "__pr_ranks_#{stage}", ranks_ipc)
309-
Dux.Remote.Worker.register_table(w, "__pr_outdeg_#{stage}", outdeg_ipc)
310-
end)
311-
end)
312-
313-
Task.await_many(tasks, 30_000)
307+
broadcast_to_workers(workers, [
308+
{"__pr_ranks_#{stage}", ranks_ipc},
309+
{"__pr_outdeg_#{stage}", outdeg_ipc}
310+
])
314311

315312
# Each worker computes contributions from its copy of the edges
316313
contribution_pipeline =
@@ -329,7 +326,7 @@ defmodule Dux.Graph do
329326

330327
# Fan out, merge contributions, then rename on coordinator
331328
contributions =
332-
Dux.Remote.Coordinator.execute(contribution_pipeline, workers: workers)
329+
Coordinator.execute(contribution_pipeline, workers: workers)
333330
|> Dux.rename([{String.to_atom(dst), String.to_atom(vid)}])
334331
|> Dux.compute()
335332

@@ -346,8 +343,8 @@ defmodule Dux.Graph do
346343
# Cleanup broadcast tables
347344
Enum.each(workers, fn w ->
348345
try do
349-
Dux.Remote.Worker.drop_table(w, "__pr_ranks_#{stage}")
350-
Dux.Remote.Worker.drop_table(w, "__pr_outdeg_#{stage}")
346+
Worker.drop_table(w, "__pr_ranks_#{stage}")
347+
Worker.drop_table(w, "__pr_outdeg_#{stage}")
351348
catch
352349
_, _ -> :ok
353350
end
@@ -432,7 +429,7 @@ defmodule Dux.Graph do
432429
max_iterations = Keyword.get(opts, :max_iterations, 100)
433430
# Connected components stays local for now — the iterative SQL
434431
# references local temp tables that can't be distributed.
435-
# TODO: implement broadcast pattern like pagerank_distributed
432+
# Distributed version would use the broadcast pattern from pagerank_distributed.
436433
vid = graph.vertex_id
437434
src = graph.edge_src
438435
dst = graph.edge_dst
@@ -567,11 +564,20 @@ defmodule Dux.Graph do
567564
Dux.n_rows(graph.edges)
568565
end
569566

567+
defp broadcast_to_workers(workers, tables) do
568+
tasks = Enum.map(workers, &Task.async(fn -> register_tables(&1, tables) end))
569+
Task.await_many(tasks, 30_000)
570+
end
571+
572+
defp register_tables(worker, tables) do
573+
Enum.each(tables, fn {name, ipc} -> Worker.register_table(worker, name, ipc) end)
574+
end
575+
570576
# Compute locally or distributed depending on workers option
571577
defp do_compute(dux, nil), do: Dux.compute(dux)
572578

573579
defp do_compute(dux, workers) when is_list(workers) do
574-
Dux.Remote.Coordinator.execute(dux, workers: workers)
580+
Coordinator.execute(dux, workers: workers)
575581
end
576582

577583
# Escape double quotes in SQL identifiers to prevent injection

0 commit comments

Comments
 (0)