Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions guides/distributed.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,13 @@ Kino.start_child!(
],
min: 0,
max: 10,
max_concurrency: 1,
backend: {FLAME.FlyBackend,
cpu_kind: "performance", cpus: 4, memory_mb: 8192,
token: System.fetch_env!("FLY_API_TOKEN"),
env: Map.take(System.get_env(), ["LIVEBOOK_COOKIE"])
env: %{"LIVEBOOK_COOKIE" => Atom.to_string(Node.get_cookie())}
},
boot_timeout: 120_000,
idle_shutdown_after: :timer.minutes(5)}
)

Expand All @@ -115,7 +117,8 @@ Key Livebook-specific settings:
- **`Kino.start_child!/1`** — supervises the pool under Livebook's runtime
- **`sync_beams`** — syncs notebook-compiled beam files to runners
- **`start_apps: true`** — starts all applications (including `:dux`) on runners
- **`LIVEBOOK_COOKIE`** — required for BEAM distribution between nodes
- **`LIVEBOOK_COOKIE`** — required for BEAM distribution between nodes (use `Node.get_cookie()`, not the env var)
- **`max_concurrency: 1`** — one DuckDB worker per machine. DuckDB saturates cores internally; multiple workers on one machine just adds contention.

#### Deployed Elixir app

Expand All @@ -126,6 +129,7 @@ Add the FLAME pool to your application supervision tree:
children = [
{FLAME.Pool,
name: :dux_pool,
max_concurrency: 1,
backend: {FLAME.FlyBackend,
token: System.fetch_env!("FLY_API_TOKEN"),
cpus: 4, memory_mb: 16_384},
Expand Down
2 changes: 1 addition & 1 deletion lib/dux.ex
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ defmodule Dux do

import Dux.SQL.Helpers, only: [qi: 1]

defstruct [:source, :remote, :workers, ops: [], names: [], dtypes: %{}, groups: []]
defstruct [:source, :remote, :workers, :meta, ops: [], names: [], dtypes: %{}, groups: []]

@type source ::
{:parquet, String.t()}
Expand Down
76 changes: 45 additions & 31 deletions lib/dux/flame.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ if Code.ensure_loaded?(FLAME) do
],
min: 0,
max: 10,
max_concurrency: 1,
backend: {FLAME.FlyBackend,
cpu_kind: "performance", cpus: 4, memory_mb: 8192,
token: System.fetch_env!("FLY_API_TOKEN"),
env: Map.take(System.get_env(), ["LIVEBOOK_COOKIE"])
env: %{"LIVEBOOK_COOKIE" => Atom.to_string(Node.get_cookie())}
},
boot_timeout: 120_000,
idle_shutdown_after: :timer.minutes(5)}
)

Expand All @@ -45,6 +47,7 @@ if Code.ensure_loaded?(FLAME) do
name: :dux_pool,
backend: {FLAME.FlyBackend, ...},
max: 10,
max_concurrency: 1,
code_sync: [start_apps: [:dux], copy_apps: true],
idle_shutdown_after: :timer.minutes(5)}

Expand All @@ -55,6 +58,8 @@ if Code.ensure_loaded?(FLAME) do
After idle timeout, FLAME auto-terminates the runners.
"""

alias Dux.Remote.Worker

@default_pool Dux.FlamePool

@doc """
Expand All @@ -69,56 +74,65 @@ if Code.ensure_loaded?(FLAME) do
"""
def spin_up(n, opts \\ []) when is_integer(n) and n > 0 do
pool = Keyword.get(opts, :pool, @default_pool)
setup = Keyword.get(opts, :setup)

# Place workers sequentially. With max_concurrency: 1, each placed
# child holds its concurrency slot permanently, so the next
# place_child boots a new runner. Sequential avoids internal FLAME
# GenServer timeouts that occur with concurrent placement.
workers =
for _ <- 1..n do
{:ok, pid} = FLAME.place_child(pool, {Dux.Remote.Worker, []})
pid
end

await_pg_registration(workers)
workers
end

defp await_pg_registration(workers, timeout_ms \\ 5_000) do
expected = MapSet.new(workers)
deadline = System.monotonic_time(:millisecond) + timeout_ms
do_await_pg(expected, deadline)
end

defp do_await_pg(expected, deadline) do
registered =
:pg.get_members(:dux, Dux.Remote.Worker)
|> MapSet.new()

if MapSet.subset?(expected, registered) do
:ok
else
if System.monotonic_time(:millisecond) > deadline do
# Best-effort: proceed even if not all registered yet
:ok
else
Process.sleep(10)
do_await_pg(expected, deadline)
end
# Run setup callback on each worker (e.g. create S3 secrets, load extensions)
if setup do
workers
|> Task.async_stream(
fn worker -> GenServer.call(worker, {:setup, setup}, 30_000) end,
timeout: 60_000
)
|> Enum.each(fn
{:ok, :ok} -> :ok
{:ok, {:error, reason}} -> raise "Worker setup failed: #{reason}"
{:exit, reason} -> raise "Worker setup crashed: #{inspect(reason)}"
end)
end

workers
end

@doc """
Get status of the FLAME-backed Dux cluster.

Returns worker count and PIDs, grouped by node.
Pass the workers list returned by `spin_up/2`, or omit to
discover workers via `:pg` (may not find remote FLAME workers).

Returns worker count grouped by node, with alive status.
"""
alias Dux.Remote.Worker
def status(workers_or_pool \\ @default_pool)

def status(workers) when is_list(workers) do
nodes =
workers
|> Enum.group_by(&node/1)
|> Map.new(fn {n, pids} -> {n, length(pids)} end)

%{
total_workers: length(workers),
nodes: nodes,
worker_pids: workers
}
end

def status(pool \\ @default_pool) do
def status(pool) when is_atom(pool) do
workers = Worker.list()

nodes =
workers
|> Enum.group_by(&node/1)
|> Enum.map(fn {node, pids} -> {node, length(pids)} end)
|> Map.new()
|> Map.new(fn {n, pids} -> {n, length(pids)} end)

%{
pool: pool,
Expand Down
94 changes: 57 additions & 37 deletions lib/dux/remote/coordinator.ex
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ defmodule Dux.Remote.Coordinator do
raise ArgumentError, "no workers available for distributed execution"
end

start_time = System.monotonic_time()

# Resolve sources that can be partitioned at the coordinator level.
# DuckLake attached sources → file manifest for direct parquet reads.
pipeline = resolve_ducklake_source(pipeline)
Expand All @@ -64,42 +66,61 @@ defmodule Dux.Remote.Coordinator do
} = split = PipelineSplitter.split(pipeline.ops)

# Preprocess joins: broadcast/shuffle right sides that aren't worker-safe
case preprocess_joins(worker_ops, workers, timeout, bcast_threshold) do
{:ok, processed_ops, broadcast_tables} ->
# All joins handled inline (broadcast or push-down)
worker_pipeline = %{pipeline | ops: processed_ops}
result =
case preprocess_joins(worker_ops, workers, timeout, bcast_threshold) do
{:ok, processed_ops, broadcast_tables} ->
# All joins handled inline (broadcast or push-down)
worker_pipeline = %{pipeline | ops: processed_ops}

try do
result =
execute_fan_out(worker_pipeline, workers, strategy, timeout, streaming?, split)

result = apply_avg_rewrites(result, rewrites)
finalize(result, coord_ops)
after
cleanup_broadcast_tables(workers, broadcast_tables)
end

{:shuffle, ops_before, {right_computed, how, on_cols, suffix}, ops_after,
broadcast_tables} ->
# Pipeline needs a shuffle stage: execute pre-join → shuffle → post-join
try do
execute_with_shuffle(%{
pipeline: pipeline,
ops_before: ops_before,
right: right_computed,
how: how,
on_cols: on_cols,
suffix: suffix,
ops_after: ops_after,
coord_ops: coord_ops,
rewrites: rewrites,
workers: workers,
strategy: strategy,
timeout: timeout
})
after
cleanup_broadcast_tables(workers, broadcast_tables)
end
end

try do
result =
execute_fan_out(worker_pipeline, workers, strategy, timeout, streaming?, split)
# Attach execution metadata
total_ms =
System.convert_time_unit(System.monotonic_time() - start_time, :native, :millisecond)

result = apply_avg_rewrites(result, rewrites)
finalize(result, coord_ops)
after
cleanup_broadcast_tables(workers, broadcast_tables)
end
nodes = workers |> Enum.map(&node/1) |> Enum.uniq()

{:shuffle, ops_before, {right_computed, how, on_cols, suffix}, ops_after, broadcast_tables} ->
# Pipeline needs a shuffle stage: execute pre-join → shuffle → post-join
try do
execute_with_shuffle(%{
pipeline: pipeline,
ops_before: ops_before,
right: right_computed,
how: how,
on_cols: on_cols,
suffix: suffix,
ops_after: ops_after,
coord_ops: coord_ops,
rewrites: rewrites,
workers: workers,
strategy: strategy,
timeout: timeout
})
after
cleanup_broadcast_tables(workers, broadcast_tables)
end
end
meta = %{
distributed: true,
n_workers: length(workers),
n_nodes: length(nodes),
nodes: nodes,
merge_strategy: if(streaming?, do: :streaming, else: :batch),
total_duration_ms: total_ms
}

%{result | meta: meta}
end

@doc """
Expand Down Expand Up @@ -290,12 +311,11 @@ defmodule Dux.Remote.Coordinator do

case result do
{:ok, ipc} ->
duration = System.monotonic_time() - start_time

:telemetry.execute(
[:dux, :distributed, :worker, :stop],
%{
duration: System.monotonic_time() - start_time,
ipc_bytes: byte_size(ipc)
},
%{duration: duration, ipc_bytes: byte_size(ipc)},
%{worker: worker, worker_index: idx, n_workers: n_workers}
)

Expand Down
10 changes: 6 additions & 4 deletions lib/dux/remote/partitioner.ex
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ defmodule Dux.Remote.Partitioner do

if file_col && rows_col do
size_map =
Enum.zip(Enum.to_list(file_col), Enum.to_list(rows_col))
Enum.zip(Adbc.Column.to_list(file_col), Adbc.Column.to_list(rows_col))
|> Map.new()

Enum.map(files, fn f -> {f, Map.get(size_map, f, 0)} end)
Expand All @@ -206,8 +206,10 @@ defmodule Dux.Remote.Partitioner do
end
end

defp find_column(columns, name) do
Enum.find(columns, fn col -> col.field.name == name end)
defp find_column(batches, name) do
batches
|> List.flatten()
|> Enum.find(fn col -> col.field.name == name end)
end

defp all_local?(files) do
Expand Down Expand Up @@ -277,7 +279,7 @@ defmodule Dux.Remote.Partitioner do

case find_column(materialized.data, "file") do
nil -> {:ok, [path]}
col -> {:ok, Enum.to_list(col)}
col -> {:ok, Adbc.Column.to_list(col)}
end

{:error, _} ->
Expand Down
29 changes: 29 additions & 0 deletions lib/dux/remote/worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,20 @@ defmodule Dux.Remote.Worker do
|> Enum.filter(&(node(&1) == node))
end

@doc """
Run a setup function on the worker's node.

The function runs in the worker's GenServer process. Use this to configure
the worker's DuckDB (e.g. create S3 secrets, load extensions).

Worker.setup(worker, fn ->
Dux.create_secret(:s3, type: :s3, region: "us-west-2")
end)
"""
def setup(worker, fun, timeout \\ 30_000) when is_function(fun, 0) do
GenServer.call(worker, {:setup, fun}, timeout)
end

@doc """
Execute a `%Dux{}` pipeline on a worker. Returns `{:ok, ipc_binary}` or `{:error, reason}`.

Expand Down Expand Up @@ -157,6 +171,21 @@ defmodule Dux.Remote.Worker do
{:ok, %{db: db, conn: conn, tables: %{}}}
end

@impl true
def handle_call({:setup, fun}, _from, state) when is_function(fun, 0) do
# Setup runs on the worker's node. Dux.exec/1 and Dux.create_secret/2
# will use this node's Dux.Connection (the worker's DuckDB).
result =
try do
fun.()
:ok
rescue
e -> {:error, Exception.message(e)}
end

{:reply, result, state}
end

@impl true
def handle_call({:execute, %Dux{} = pipeline}, _from, %{conn: conn} = state) do
result =
Expand Down
13 changes: 11 additions & 2 deletions test/dux/flame_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,20 @@ if Code.ensure_loaded?(FLAME) do
end

describe "status" do
test "returns cluster info", %{pool: pool} do
test "returns cluster info with workers list", %{pool: pool} do
workers = Dux.Flame.spin_up(2, pool: pool)
status = Dux.Flame.status(workers)

assert status.total_workers == 2
assert is_map(status.nodes)
assert is_list(status.worker_pids)
end

test "returns cluster info with pool name", %{pool: pool} do
_workers = Dux.Flame.spin_up(2, pool: pool)
status = Dux.Flame.status(pool)

assert status.total_workers >= 2
assert status.total_workers >= 0
assert is_map(status.nodes)
end
end
Expand Down
Loading