Skip to content

Commit 5d912c1

Browse files
authored
Merge pull request #6 from trouze/feat/cost-overlay
replay: Snowflake cost overlay (--warehouse-size)
2 parents 9e3161c + 7743815 commit 5d912c1

7 files changed

Lines changed: 574 additions & 23 deletions

File tree

.actrc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
-P ubuntu-latest=catthehacker/ubuntu:act-latest
2+
--container-architecture linux/amd64

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,11 @@ All notable changes to this project will be documented in this file. The format
44

55
## [Unreleased]
66

7+
### Added
8+
9+
- `replay` cost overlay: pass `--warehouse-size` (Snowflake XS…6XL) or `--credits-per-hour` (non-Snowflake adapters) to translate wall-clock into dollars. Renders **Run cost**, **Critical-path floor**, **Headroom** (= run − floor; the prize for better parallelization), and **Idle cost** (the $ equivalent of thread-idle warehouse-seconds). Defaults to $2.00/credit (Snowflake Standard On-Demand); override with `--rate-per-credit`. Snowflake's 60-second minimum-billing floor is applied automatically; pass `--no-minimum-billing` to see raw wall-clock × rate.
10+
- New module `dbt_dag_opt.cost` with `CostInputs`, `CostReport`, `compute_cost()`, `credits_per_hour_for()`, and `cost_inputs_from_replay()`. Designed primitive-first so a future `whatif` simulator can call `compute_cost` against simulated schedules and diff the resulting `CostReport`s.
11+
712
## [0.1.0] - 2026-04-24
813

914
Initial PyPI release. Complete rewrite of the pre-release prototype.

README.md

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,9 +89,33 @@ dbt-dag-opt replay [OPTIONS]
8989
--account-id / --job-id dbt Cloud mode (same as analyze)
9090
-f, --format [text|json] Output format [default: text]
9191
--top-idle-gaps INTEGER How many idle gaps to surface [default: 10]
92+
--warehouse-size TEXT Snowflake size (XS, S, M, L, XL, 2XL…6XL); triggers cost overlay
93+
--credits-per-hour FLOAT Raw credits/hour for non-Snowflake adapters
94+
--rate-per-credit FLOAT USD per credit [default: 2.0 (Standard On-Demand)]
95+
--no-minimum-billing Skip the 60s Snowflake minimum-billing floor
9296
-o, --output PATH Write output to a file instead of stdout
9397
```
9498

99+
#### Cost overlay
100+
101+
Pass `--warehouse-size` to translate wall-clock into dollars:
102+
103+
```bash
104+
dbt-dag-opt replay \
105+
--manifest target/manifest.json \
106+
--run-results target/run_results.json \
107+
--warehouse-size L
108+
```
109+
110+
Four numbers frame the output:
111+
112+
- **Run cost** — what this run actually billed (wall-clock × warehouse rate, with the 60s floor applied).
113+
- **Critical-path floor** — the irreducible cost of your slowest dependency chain. You can't beat this without making individual models faster.
114+
- **Headroom**`run − floor`. The prize for better parallelization: what you could save if threads never sat idle.
115+
- **Idle cost** — the $ equivalent of thread-idle warehouse-seconds. Distinct from headroom: idle cost includes time spent waiting on long-tail critical-path models that can't be parallelized away.
116+
117+
Defaults to $2.00/credit (Snowflake Standard On-Demand). Override with `--rate-per-credit` (Enterprise ≈ 3.0, Business Critical ≈ 4.0; check your contract). Non-Snowflake adapters: pass `--credits-per-hour N` instead of `--warehouse-size`.
118+
95119
### Output formats
96120

97121
- `table` — rich terminal table (default for `analyze`).
@@ -110,11 +134,10 @@ Distances sum the execution time of every node along the path — that's the war
110134

111135
## What this is / isn't
112136

113-
It **is** a CLI tool that points at the slowest chains in your DAG and — as of `replay` — the observed schedule that those chains actually produced.
137+
It **is** a CLI tool that points at the slowest chains in your DAG, reconstructs the observed schedule those chains produced (`replay`), and — with `--warehouse-size` — translates that schedule into dollars.
114138

115139
It **isn't** (yet):
116-
- A predictive scheduler simulator. `replay` reconstructs what already happened; it doesn't yet project what would happen under a different `--threads N` or if you sped up a specific model. That "what-if" loop is planned next.
117-
- A cost model. Multiplying wall-clock × your warehouse rate is on you — a `--warehouse-size` flag is planned alongside the what-if loop.
140+
- A predictive scheduler simulator. `replay` reconstructs what already happened; it doesn't yet project what would happen under a different `--threads N` or if you sped up a specific model. That "what-if" loop is planned next, and will diff two cost reports to show projected $ savings.
118141

119142
## Development
120143

src/dbt_dag_opt/cli.py

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,11 @@
1010
import typer
1111

1212
from dbt_dag_opt import __version__, artifacts
13+
from dbt_dag_opt.cost import (
14+
DEFAULT_RATE_PER_CREDIT_USD,
15+
compute_cost,
16+
cost_inputs_from_replay,
17+
)
1318
from dbt_dag_opt.errors import DbtDagOptError
1419
from dbt_dag_opt.formatters import Format, render
1520
from dbt_dag_opt.graph import build_dag
@@ -181,12 +186,57 @@ def replay(
181186
help="Number of idle gaps to surface. Use 0 to suppress.",
182187
),
183188
] = 10,
189+
warehouse_size: Annotated[
190+
str | None,
191+
typer.Option(
192+
"--warehouse-size",
193+
help="Snowflake warehouse size (XS, S, M, L, XL, 2XL...6XL). Triggers cost overlay.",
194+
),
195+
] = None,
196+
credits_per_hour: Annotated[
197+
float | None,
198+
typer.Option(
199+
"--credits-per-hour",
200+
help=(
201+
"Raw credits/hour for non-Snowflake adapters. "
202+
"Mutually exclusive with --warehouse-size."
203+
),
204+
),
205+
] = None,
206+
rate_per_credit: Annotated[
207+
float,
208+
typer.Option(
209+
"--rate-per-credit",
210+
help="USD per credit. Default 2.0 (Snowflake Standard On-Demand).",
211+
),
212+
] = DEFAULT_RATE_PER_CREDIT_USD,
213+
no_minimum_billing: Annotated[
214+
bool,
215+
typer.Option(
216+
"--no-minimum-billing",
217+
help="Skip the 60s minimum-billing floor that Snowflake applies on warehouse resume.",
218+
),
219+
] = False,
184220
output: Annotated[
185221
Path | None,
186222
typer.Option("--output", "-o", help="Write output to a file instead of stdout."),
187223
] = None,
188224
) -> None:
189225
"""Replay an observed run: per-thread utilization, critical path, and idle-gap attribution."""
226+
if warehouse_size is not None and credits_per_hour is not None:
227+
raise typer.BadParameter(
228+
"--warehouse-size and --credits-per-hour are mutually exclusive."
229+
)
230+
cost_requested = warehouse_size is not None or credits_per_hour is not None
231+
if not cost_requested and rate_per_credit != DEFAULT_RATE_PER_CREDIT_USD:
232+
raise typer.BadParameter(
233+
"--rate-per-credit requires --warehouse-size or --credits-per-hour."
234+
)
235+
if not cost_requested and no_minimum_billing:
236+
raise typer.BadParameter(
237+
"--no-minimum-billing requires --warehouse-size or --credits-per-hour."
238+
)
239+
190240
try:
191241
data = _load(
192242
manifest=manifest,
@@ -199,11 +249,21 @@ def replay(
199249
)
200250
limit = top_idle_gaps if top_idle_gaps > 0 else 0
201251
report = build_replay(data, top_idle_gaps_limit=limit)
252+
cost_report = None
253+
if cost_requested:
254+
cost_inputs = cost_inputs_from_replay(
255+
report,
256+
warehouse_size=warehouse_size,
257+
credits_per_hour=credits_per_hour,
258+
rate_per_credit_usd=rate_per_credit,
259+
apply_minimum_billing=not no_minimum_billing,
260+
)
261+
cost_report = compute_cost(cost_inputs)
202262
except DbtDagOptError as exc:
203263
typer.secho(f"error: {exc}", fg=typer.colors.RED, err=True)
204264
raise typer.Exit(code=1) from exc
205265

206-
rendered = render_replay(report, fmt)
266+
rendered = render_replay(report, fmt, cost=cost_report)
207267

208268
if output is not None:
209269
output.write_text(rendered, encoding="utf-8")

src/dbt_dag_opt/cost.py

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
"""Snowflake-style cost model layered on top of a ``ReplayReport``.
2+
3+
Translates wall-clock, thread-idleness, and the critical-path floor into
4+
dollar amounts the user can reason about. Kept intentionally small and
5+
primitive-driven so a future ``whatif`` simulator can call ``compute_cost``
6+
twice (baseline + simulated) and diff the two ``CostReport``s without
7+
having to fabricate a full replay.
8+
"""
9+
10+
from __future__ import annotations
11+
12+
from dataclasses import dataclass
13+
14+
from dbt_dag_opt.errors import InvalidArtifactError
15+
from dbt_dag_opt.replay import ReplayReport
16+
17+
SNOWFLAKE_CREDITS_PER_HOUR: dict[str, float] = {
18+
"XS": 1.0,
19+
"S": 2.0,
20+
"M": 4.0,
21+
"L": 8.0,
22+
"XL": 16.0,
23+
"2XL": 32.0,
24+
"3XL": 64.0,
25+
"4XL": 128.0,
26+
"5XL": 256.0,
27+
"6XL": 512.0,
28+
}
29+
30+
DEFAULT_RATE_PER_CREDIT_USD: float = 2.0
31+
MIN_BILLING_SECONDS: float = 60.0
32+
33+
_SIZE_ALIASES: dict[str, str] = {
34+
"XSMALL": "XS",
35+
"SMALL": "S",
36+
"MEDIUM": "M",
37+
"LARGE": "L",
38+
"XLARGE": "XL",
39+
"2XLARGE": "2XL",
40+
"3XLARGE": "3XL",
41+
"4XLARGE": "4XL",
42+
"5XLARGE": "5XL",
43+
"6XLARGE": "6XL",
44+
}
45+
46+
47+
@dataclass(frozen=True)
48+
class CostInputs:
49+
"""Primitive inputs to the cost model.
50+
51+
Decoupled from ``ReplayReport`` so ``whatif`` can pass simulated numbers
52+
without going through the full parse pipeline.
53+
"""
54+
55+
wall_clock_seconds: float
56+
total_cpu_seconds: float
57+
thread_count: int
58+
critical_path_seconds: float
59+
credits_per_hour: float
60+
rate_per_credit_usd: float
61+
apply_minimum_billing: bool = True
62+
warehouse_size_label: str | None = None
63+
64+
65+
@dataclass(frozen=True)
66+
class CostReport:
67+
inputs: CostInputs
68+
billed_seconds: float
69+
rate_per_second_usd: float
70+
run_cost_usd: float
71+
floor_cost_usd: float
72+
headroom_usd: float
73+
idle_thread_seconds: float
74+
waste_fraction: float
75+
idle_cost_usd: float
76+
min_billing_applied: bool
77+
78+
79+
def credits_per_hour_for(size: str) -> float:
80+
"""Resolve a Snowflake warehouse size alias to credits/hour.
81+
82+
Case-insensitive; strips ``-`` and whitespace. Accepts both shorthand
83+
(``"L"``, ``"2XL"``) and long form (``"Large"``, ``"2X-Large"``).
84+
"""
85+
normalized = size.upper().replace("-", "").replace(" ", "").replace("_", "")
86+
if normalized in SNOWFLAKE_CREDITS_PER_HOUR:
87+
return SNOWFLAKE_CREDITS_PER_HOUR[normalized]
88+
if normalized in _SIZE_ALIASES:
89+
return SNOWFLAKE_CREDITS_PER_HOUR[_SIZE_ALIASES[normalized]]
90+
raise InvalidArtifactError(
91+
f"unknown warehouse size: {size!r}. "
92+
f"Expected one of {sorted(SNOWFLAKE_CREDITS_PER_HOUR)}."
93+
)
94+
95+
96+
def compute_cost(inputs: CostInputs) -> CostReport:
97+
rate_per_second = inputs.credits_per_hour * inputs.rate_per_credit_usd / 3600.0
98+
99+
raw_wall = max(0.0, inputs.wall_clock_seconds)
100+
if inputs.apply_minimum_billing and raw_wall < MIN_BILLING_SECONDS:
101+
billed = MIN_BILLING_SECONDS
102+
min_billing_applied = True
103+
else:
104+
billed = raw_wall
105+
min_billing_applied = False
106+
107+
run_cost = billed * rate_per_second
108+
109+
raw_floor = max(0.0, inputs.critical_path_seconds)
110+
if inputs.apply_minimum_billing:
111+
floor_seconds = max(raw_floor, MIN_BILLING_SECONDS)
112+
else:
113+
floor_seconds = raw_floor
114+
floor_cost = floor_seconds * rate_per_second
115+
116+
warehouse_seconds = raw_wall * max(1, inputs.thread_count)
117+
idle_thread_seconds = max(0.0, warehouse_seconds - max(0.0, inputs.total_cpu_seconds))
118+
waste_fraction = (
119+
idle_thread_seconds / warehouse_seconds if warehouse_seconds > 0 else 0.0
120+
)
121+
idle_cost = run_cost * waste_fraction
122+
headroom = max(0.0, run_cost - floor_cost)
123+
124+
return CostReport(
125+
inputs=inputs,
126+
billed_seconds=billed,
127+
rate_per_second_usd=rate_per_second,
128+
run_cost_usd=run_cost,
129+
floor_cost_usd=floor_cost,
130+
headroom_usd=headroom,
131+
idle_thread_seconds=idle_thread_seconds,
132+
waste_fraction=waste_fraction,
133+
idle_cost_usd=idle_cost,
134+
min_billing_applied=min_billing_applied,
135+
)
136+
137+
138+
def cost_inputs_from_replay(
139+
report: ReplayReport,
140+
*,
141+
warehouse_size: str | None,
142+
credits_per_hour: float | None,
143+
rate_per_credit_usd: float,
144+
apply_minimum_billing: bool,
145+
) -> CostInputs:
146+
"""Adapt a ``ReplayReport`` + user flags into ``CostInputs``.
147+
148+
Exactly one of ``warehouse_size`` / ``credits_per_hour`` must be set;
149+
the caller (CLI) enforces this.
150+
"""
151+
if warehouse_size is not None and credits_per_hour is not None:
152+
raise InvalidArtifactError(
153+
"pass either warehouse_size or credits_per_hour, not both"
154+
)
155+
if warehouse_size is not None:
156+
cph = credits_per_hour_for(warehouse_size)
157+
label = warehouse_size.upper().replace("-", "").replace(" ", "").replace("_", "")
158+
if label in _SIZE_ALIASES:
159+
label = _SIZE_ALIASES[label]
160+
elif credits_per_hour is not None:
161+
if credits_per_hour <= 0:
162+
raise InvalidArtifactError("credits_per_hour must be positive")
163+
cph = float(credits_per_hour)
164+
label = None
165+
else:
166+
raise InvalidArtifactError(
167+
"cost inputs require warehouse_size or credits_per_hour"
168+
)
169+
170+
return CostInputs(
171+
wall_clock_seconds=report.wall_clock_seconds,
172+
total_cpu_seconds=report.total_cpu_seconds,
173+
thread_count=report.thread_count,
174+
critical_path_seconds=report.critical_path_seconds,
175+
credits_per_hour=cph,
176+
rate_per_credit_usd=rate_per_credit_usd,
177+
apply_minimum_billing=apply_minimum_billing,
178+
warehouse_size_label=label,
179+
)

0 commit comments

Comments
 (0)