Skip to content

Commit 4c47480

Browse files
cigraingerclaude
andcommitted
docs: FLAME cluster guide, cheatsheet updates, README benchmarks
New guide: flame-clusters.livemd - Full walkthrough from zero to 5-machine cluster with FLAME + Fly.io - Uses Ookla Speedtest open dataset (~20GB public Parquet on S3) - Covers: anonymous S3 access, FLAME pool config, spin_up with memory limits and setup callbacks, distributed queries, joins, SQL macros on workers, distributed writes, monitoring, cleanup - Runnable as a Livebook on Fly.io Cheatsheet updates: - Added SQL macros section (define, define_table, undefine, list_macros) - Added grouping section (group_by, ungroup) - Added exec/1 to SQL section - Updated FLAME section with memory_limit, temp_directory, local/1 README: - Added performance section with Dux vs Explorer (Polars) benchmarks - Added FLAME clusters guide to guides list Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent e76b58c commit 4c47480

4 files changed

Lines changed: 312 additions & 2 deletions

File tree

README.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,19 @@ Dux.from_parquet("s3://data/sales/**/*.parquet")
2222
|> Dux.to_rows()
2323
```
2424

25+
## Performance
26+
27+
Dux pipelines compile to SQL and execute inside DuckDB — no data crosses into Elixir until you materialise. On a 10M-row dataset (Apple M3 Max, 36GB):
28+
29+
| Operation | Dux | Explorer (Polars) | Ratio |
30+
|-----------|-----|-------------------|-------|
31+
| Filter (10M rows) | 41ms | 13ms | 3.1x |
32+
| Mutate (10M rows) | ~40ms | ~14ms | ~3x |
33+
| Group + Summarise | ~12ms | ~21ms | **0.6x** |
34+
| Memory per compute | 5-10 KB | 5-10 KB | ~same |
35+
36+
Dux is within 3x of Polars for single-node operations and **faster for aggregations** (DuckDB's columnar engine). The gap narrows further at scale — Dux can distribute across machines while Polars is single-node.
37+
2538
## Design
2639

2740
Dux is the successor to [Explorer](https://github.com/elixir-explorer/explorer). That means it borrows its verb design from dplyr and the tidyverse — constrained, composable operations that each do one thing well. If you've used `dplyr::filter()`, `mutate()`, `group_by() |> summarise()`, the Dux API will feel familiar.
@@ -180,6 +193,7 @@ Lazy pipelines render with source provenance, operations, and generated SQL. Com
180193
- [Transformations](https://hexdocs.pm/dux/transformations.html) — filter, mutate, window functions
181194
- [Joins & Reshape](https://hexdocs.pm/dux/joins-and-reshape.html) — join types, ASOF joins, pivots
182195
- [Distributed Execution](https://hexdocs.pm/dux/distributed.html) — architecture, partitioning, distributed IO
196+
- [FLAME Clusters](https://hexdocs.pm/dux/flame-clusters.html) — ad-hoc Spark-like clusters with Fly.io
183197
- [Graph Analytics](https://hexdocs.pm/dux/graph-analytics.html) — PageRank, shortest paths, components
184198
- [Cheatsheet](https://hexdocs.pm/dux/cheatsheet.html) — quick reference for all verbs
185199

guides/cheatsheet.cheatmd

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ Dux.drop_secret(:s3)
3535
### From SQL
3636
```elixir
3737
Dux.from_query("SELECT * FROM range(100) t(x)")
38+
Dux.exec("SET threads = 8") # raw DDL/DML
3839
```
3940

4041
## Filtering
@@ -101,6 +102,13 @@ Dux.slice(df, 5, 10) # offset 5, take 10
101102
Dux.distinct(df) # deduplicate all columns
102103
```
103104

105+
### Grouping
106+
```elixir
107+
Dux.group_by(df, :region) # set groups
108+
Dux.group_by(df, [:region, :year]) # multi-column
109+
Dux.ungroup(df) # clear groups
110+
```
111+
104112
## Aggregation
105113

106114
### Group + Summarise
@@ -223,6 +231,17 @@ Dux.sql_preview(df) # → SQL string
223231
Dux.sql_preview(df, pretty: true) # → formatted SQL
224232
```
225233

234+
## SQL Macros
235+
236+
```elixir
237+
# Reusable SQL functions — fully lazy, zero overhead
238+
Dux.define(:double, [:x], "x * 2")
239+
Dux.define(:risk, [:score], "CASE WHEN score > 0.8 THEN 'high' ELSE 'low' END")
240+
Dux.define_table(:date_spine, [:s, :e], "SELECT * FROM generate_series(s::DATE, e::DATE, INTERVAL 1 DAY) t(d)")
241+
Dux.undefine(:double)
242+
Dux.list_macros()
243+
```
244+
226245
## Distributed
227246

228247
### Reads
@@ -260,8 +279,13 @@ df |> Dux.distribute(workers) |> Dux.collect()
260279

261280
### FLAME: elastic cloud compute
262281
```elixir
263-
Dux.Flame.start_pool(backend: {FLAME.FlyBackend, ...}, max: 10)
264-
workers = Dux.Flame.spin_up(5)
282+
workers = Dux.Flame.spin_up(5,
283+
pool: :dux_pool,
284+
memory_limit: "4GB",
285+
temp_directory: "/tmp/dux_spill"
286+
)
287+
Dux.distribute(df, workers) |> Dux.compute()
288+
Dux.local(df) # back to single-node
265289
```
266290

267291
## Graph Analytics

guides/flame-clusters.livemd

Lines changed: 271 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,271 @@
1+
# FLAME Clusters: Ad-Hoc Spark on the BEAM
2+
3+
```elixir
4+
Mix.install([
5+
{:dux, "~> 0.2.0"},
6+
{:kino_dux, "~> 0.1"},
7+
{:flame, "~> 0.5"}
8+
])
9+
```
10+
11+
## Overview
12+
13+
This guide walks through building an ad-hoc distributed compute cluster
14+
using [FLAME](https://github.com/phoenixframework/flame) and
15+
[Fly.io](https://fly.io). We'll query the
16+
[Ookla Speedtest](https://registry.opendata.aws/speedtest-global-performance/)
17+
open dataset — ~20GB of global internet speed measurements stored as
18+
Parquet on S3.
19+
20+
Each FLAME runner boots a fresh machine with its own DuckDB, reads S3
21+
data directly, and auto-terminates when idle. Think of it as Spark-style
22+
elastic compute, but on the BEAM — no JVM, no YARN, no cluster manager.
23+
24+
**Prerequisites:**
25+
- A Fly.io account with a `FLY_API_TOKEN`
26+
- This notebook running on a Fly.io Livebook instance
27+
28+
## The Dataset
29+
30+
[Ookla](https://www.ookla.com/ookla-for-good/open-data) publishes
31+
quarterly internet speed test data as open Parquet files:
32+
33+
```
34+
s3://ookla-open-data/parquet/performance/
35+
type={fixed,mobile}/
36+
year={2019..2025}/
37+
quarter={1..4}/
38+
*.parquet
39+
```
40+
41+
~56 files, Hive-partitioned by connection type, year, and quarter.
42+
Each file contains millions of tile-level measurements: download/upload
43+
speeds, latency, test counts, and geographic quadkeys.
44+
45+
The data is **public — no S3 credentials needed**.
46+
47+
## 1. Configure Anonymous S3 Access
48+
49+
DuckDB reads S3 via the `httpfs` extension. For public buckets, we
50+
use the credential chain provider which falls back to unsigned requests.
51+
52+
```elixir
53+
Dux.exec("INSTALL httpfs; LOAD httpfs")
54+
Dux.create_secret(:ookla, type: :s3, provider: :credential_chain, region: "us-west-2")
55+
```
56+
57+
## 2. Explore Locally First
58+
59+
Before spinning up a cluster, let's look at a single quarter to
60+
understand the data.
61+
62+
```elixir
63+
one_quarter =
64+
Dux.from_parquet(
65+
"s3://ookla-open-data/parquet/performance/type=fixed/year=2024/quarter=4/*.parquet",
66+
hive_partitioning: true
67+
)
68+
69+
one_quarter
70+
|> Dux.head(5)
71+
|> Dux.to_rows()
72+
```
73+
74+
```elixir
75+
# How big is one quarter?
76+
one_quarter |> Dux.n_rows()
77+
```
78+
79+
```elixir
80+
# Speed distribution
81+
one_quarter
82+
|> Dux.mutate_with(download_mbps: "avg_d_kbps / 1000.0")
83+
|> Dux.summarise_with(
84+
median_down: "MEDIAN(download_mbps)",
85+
p95_down: "PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY download_mbps)",
86+
total_tests: "SUM(tests)",
87+
total_devices: "SUM(devices)"
88+
)
89+
|> Dux.to_rows()
90+
```
91+
92+
## 3. Start the FLAME Pool
93+
94+
Now let's scale out. The pool configuration controls the machines FLAME boots.
95+
96+
```elixir
97+
Kino.start_child!(
98+
{FLAME.Pool,
99+
name: :dux_pool,
100+
code_sync: [
101+
start_apps: true,
102+
sync_beams: [Path.join(System.tmp_dir!(), "livebook_runtime")]
103+
],
104+
min: 0,
105+
max: 10,
106+
max_concurrency: 1,
107+
backend: {FLAME.FlyBackend,
108+
cpu_kind: "performance",
109+
cpus: 4,
110+
memory_mb: 8192,
111+
token: System.fetch_env!("FLY_API_TOKEN"),
112+
env: %{"LIVEBOOK_COOKIE" => Atom.to_string(Node.get_cookie())}
113+
},
114+
boot_timeout: 120_000,
115+
idle_shutdown_after: :timer.minutes(5)}
116+
)
117+
```
118+
119+
Key settings:
120+
- **`max_concurrency: 1`** — one DuckDB per machine. DuckDB saturates cores internally.
121+
- **`memory_mb: 8192`** — 8GB per worker. DuckDB spills to `/tmp` if needed.
122+
- **`idle_shutdown_after: 5 min`** — machines auto-terminate. You pay only for active compute.
123+
124+
## 4. Spin Up Workers
125+
126+
```elixir
127+
workers = Dux.Flame.spin_up(5,
128+
pool: :dux_pool,
129+
memory_limit: "4GB",
130+
setup: fn ->
131+
# Each worker needs httpfs + S3 access configured
132+
Dux.exec("INSTALL httpfs; LOAD httpfs")
133+
Dux.create_secret(:ookla, type: :s3, provider: :credential_chain, region: "us-west-2")
134+
end
135+
)
136+
137+
IO.puts("#{length(workers)} workers ready")
138+
```
139+
140+
## 5. Query the Full Dataset
141+
142+
Now read **all years of fixed broadband data** across the cluster.
143+
Each worker reads its assigned Parquet files directly from S3 —
144+
no data flows through your machine.
145+
146+
```elixir
147+
all_fixed =
148+
Dux.from_parquet(
149+
"s3://ookla-open-data/parquet/performance/type=fixed/year=*/quarter=*/*.parquet",
150+
hive_partitioning: true
151+
)
152+
153+
# Global broadband trends by year
154+
trends =
155+
all_fixed
156+
|> Dux.distribute(workers)
157+
|> Dux.mutate_with(
158+
download_mbps: "avg_d_kbps / 1000.0",
159+
upload_mbps: "avg_u_kbps / 1000.0"
160+
)
161+
|> Dux.group_by(:year)
162+
|> Dux.summarise_with(
163+
median_download: "MEDIAN(download_mbps)",
164+
median_upload: "MEDIAN(upload_mbps)",
165+
median_latency: "MEDIAN(avg_lat_ms)",
166+
total_tests: "SUM(tests)",
167+
total_devices: "SUM(devices)"
168+
)
169+
|> Dux.sort_by(:year)
170+
|> Dux.collect()
171+
|> Dux.to_rows()
172+
```
173+
174+
## 6. Compare Fixed vs Mobile
175+
176+
Query both connection types in one pipeline using SQL macros.
177+
178+
```elixir
179+
Dux.define(:speed_tier, [:mbps], """
180+
CASE
181+
WHEN mbps >= 100 THEN 'fast (100+ Mbps)'
182+
WHEN mbps >= 25 THEN 'moderate (25-100 Mbps)'
183+
WHEN mbps >= 10 THEN 'slow (10-25 Mbps)'
184+
ELSE 'very slow (<10 Mbps)'
185+
END
186+
""")
187+
188+
all_data =
189+
Dux.from_parquet(
190+
"s3://ookla-open-data/parquet/performance/type=*/year=2024/quarter=*/*.parquet",
191+
hive_partitioning: true
192+
)
193+
194+
speed_distribution =
195+
all_data
196+
|> Dux.distribute(workers)
197+
|> Dux.mutate_with(
198+
download_mbps: "avg_d_kbps / 1000.0",
199+
tier: "speed_tier(avg_d_kbps / 1000.0)"
200+
)
201+
|> Dux.group_by([:type, "tier"])
202+
|> Dux.summarise_with(
203+
tiles: "COUNT(*)",
204+
total_tests: "SUM(tests)"
205+
)
206+
|> Dux.sort_by([:type, :tiles])
207+
|> Dux.collect()
208+
|> Dux.to_rows()
209+
```
210+
211+
## 7. Heavy Aggregation: Latency by Quadkey Prefix
212+
213+
Quadkeys encode geographic tiles. The first few characters identify
214+
the region. Let's find the areas with the worst latency.
215+
216+
```elixir
217+
worst_latency =
218+
all_fixed
219+
|> Dux.distribute(workers)
220+
|> Dux.filter_with("tests >= 10")
221+
|> Dux.mutate_with(region: "LEFT(quadkey, 6)")
222+
|> Dux.group_by("region")
223+
|> Dux.summarise_with(
224+
avg_latency: "AVG(avg_lat_ms)",
225+
total_tests: "SUM(tests)",
226+
n_tiles: "COUNT(*)"
227+
)
228+
|> Dux.filter_with("total_tests > 1000")
229+
|> Dux.sort_by(desc: :avg_latency)
230+
|> Dux.head(20)
231+
|> Dux.collect()
232+
|> Dux.to_rows()
233+
```
234+
235+
## 8. Writing Results
236+
237+
Distributed writes go directly from workers to S3.
238+
239+
```elixir
240+
# Write the aggregated trends back to your own bucket
241+
# (uncomment and set your bucket)
242+
243+
# all_fixed
244+
# |> Dux.distribute(workers)
245+
# |> Dux.mutate_with(download_mbps: "avg_d_kbps / 1000.0")
246+
# |> Dux.to_parquet("s3://your-bucket/ookla-processed/", partition_by: [:year])
247+
```
248+
249+
## 9. Cleanup
250+
251+
Workers auto-terminate after the idle timeout. To shut down immediately:
252+
253+
```elixir
254+
Enum.each(workers, &GenServer.stop/1)
255+
IO.puts("Workers stopped. FLAME runners will terminate shortly.")
256+
```
257+
258+
## What Just Happened
259+
260+
You built a 5-machine compute cluster from a Livebook notebook.
261+
Each machine:
262+
263+
1. Booted in ~30s via FLAME + Fly.io
264+
2. Got a full copy of your notebook's compiled code
265+
3. Started its own DuckDB with 4 cores and 8GB RAM
266+
4. Read its assigned Parquet files directly from S3
267+
5. Executed filter + group + aggregate locally
268+
6. Sent small aggregated results back to the coordinator
269+
7. Auto-terminated after 5 minutes idle
270+
271+
No infrastructure to manage. No cluster to maintain. Just notebooks and queries.

mix.exs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ defmodule Dux.MixProject do
8181
"guides/transformations.livemd",
8282
"guides/joins-and-reshape.livemd",
8383
"guides/distributed.md",
84+
"guides/flame-clusters.livemd",
8485
"guides/graph-analytics.livemd",
8586
"guides/cheatsheet.cheatmd",
8687
"CHANGELOG.md"

0 commit comments

Comments
 (0)