Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .actrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-P ubuntu-latest=catthehacker/ubuntu:act-latest
--container-architecture linux/amd64
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ All notable changes to this project will be documented in this file. The format

## [Unreleased]

### Added

- `replay` cost overlay: pass `--warehouse-size` (Snowflake XS…6XL) or `--credits-per-hour` (non-Snowflake adapters) to translate wall-clock into dollars. Renders **Run cost**, **Critical-path floor**, **Headroom** (= run − floor; the prize for better parallelization), and **Idle cost** (the $ equivalent of thread-idle warehouse-seconds). Defaults to $2.00/credit (Snowflake Standard On-Demand); override with `--rate-per-credit`. Snowflake's 60-second minimum-billing floor is applied automatically; pass `--no-minimum-billing` to see raw wall-clock × rate.
- New module `dbt_dag_opt.cost` with `CostInputs`, `CostReport`, `compute_cost()`, `credits_per_hour_for()`, and `cost_inputs_from_replay()`. Designed primitive-first so a future `whatif` simulator can call `compute_cost` against simulated schedules and diff the resulting `CostReport`s.

## [0.1.0] - 2026-04-24

Initial PyPI release. Complete rewrite of the pre-release prototype.
Expand Down
29 changes: 26 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,33 @@ dbt-dag-opt replay [OPTIONS]
--account-id / --job-id dbt Cloud mode (same as analyze)
-f, --format [text|json] Output format [default: text]
--top-idle-gaps INTEGER How many idle gaps to surface [default: 10]
--warehouse-size TEXT Snowflake size (XS, S, M, L, XL, 2XL…6XL); triggers cost overlay
--credits-per-hour FLOAT Raw credits/hour for non-Snowflake adapters
--rate-per-credit FLOAT USD per credit [default: 2.0 (Standard On-Demand)]
--no-minimum-billing Skip the 60s Snowflake minimum-billing floor
-o, --output PATH Write output to a file instead of stdout
```

#### Cost overlay

Pass `--warehouse-size` to translate wall-clock into dollars:

```bash
dbt-dag-opt replay \
--manifest target/manifest.json \
--run-results target/run_results.json \
--warehouse-size L
```

Four numbers frame the output:

- **Run cost** — what this run actually billed (wall-clock × warehouse rate, with the 60s floor applied).
- **Critical-path floor** — the irreducible cost of your slowest dependency chain. You can't beat this without making individual models faster.
- **Headroom** — `run − floor`. The prize for better parallelization: what you could save if threads never sat idle.
- **Idle cost** — the $ equivalent of thread-idle warehouse-seconds. Distinct from headroom: idle cost includes time spent waiting on long-tail critical-path models that can't be parallelized away.

Defaults to $2.00/credit (Snowflake Standard On-Demand). Override with `--rate-per-credit` (Enterprise ≈ 3.0, Business Critical ≈ 4.0; check your contract). Non-Snowflake adapters: pass `--credits-per-hour N` instead of `--warehouse-size`.

### Output formats

- `table` — rich terminal table (default for `analyze`).
Expand All @@ -110,11 +134,10 @@ Distances sum the execution time of every node along the path — that's the war

## What this is / isn't

It **is** a CLI tool that points at the slowest chains in your DAG and — as of `replay` — the observed schedule that those chains actually produced.
It **is** a CLI tool that points at the slowest chains in your DAG, reconstructs the observed schedule those chains produced (`replay`), and — with `--warehouse-size` — translates that schedule into dollars.

It **isn't** (yet):
- A predictive scheduler simulator. `replay` reconstructs what already happened; it doesn't yet project what would happen under a different `--threads N` or if you sped up a specific model. That "what-if" loop is planned next.
- A cost model. Multiplying wall-clock × your warehouse rate is on you — a `--warehouse-size` flag is planned alongside the what-if loop.
- A predictive scheduler simulator. `replay` reconstructs what already happened; it doesn't yet project what would happen under a different `--threads N` or if you sped up a specific model. That "what-if" loop is planned next, and will diff two cost reports to show projected $ savings.

## Development

Expand Down
62 changes: 61 additions & 1 deletion src/dbt_dag_opt/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@
import typer

from dbt_dag_opt import __version__, artifacts
from dbt_dag_opt.cost import (
DEFAULT_RATE_PER_CREDIT_USD,
compute_cost,
cost_inputs_from_replay,
)
from dbt_dag_opt.errors import DbtDagOptError
from dbt_dag_opt.formatters import Format, render
from dbt_dag_opt.graph import build_dag
Expand Down Expand Up @@ -181,12 +186,57 @@ def replay(
help="Number of idle gaps to surface. Use 0 to suppress.",
),
] = 10,
warehouse_size: Annotated[
str | None,
typer.Option(
"--warehouse-size",
help="Snowflake warehouse size (XS, S, M, L, XL, 2XL...6XL). Triggers cost overlay.",
),
] = None,
credits_per_hour: Annotated[
float | None,
typer.Option(
"--credits-per-hour",
help=(
"Raw credits/hour for non-Snowflake adapters. "
"Mutually exclusive with --warehouse-size."
),
),
] = None,
rate_per_credit: Annotated[
float,
typer.Option(
"--rate-per-credit",
help="USD per credit. Default 2.0 (Snowflake Standard On-Demand).",
),
] = DEFAULT_RATE_PER_CREDIT_USD,
no_minimum_billing: Annotated[
bool,
typer.Option(
"--no-minimum-billing",
help="Skip the 60s minimum-billing floor that Snowflake applies on warehouse resume.",
),
] = False,
output: Annotated[
Path | None,
typer.Option("--output", "-o", help="Write output to a file instead of stdout."),
] = None,
) -> None:
"""Replay an observed run: per-thread utilization, critical path, and idle-gap attribution."""
if warehouse_size is not None and credits_per_hour is not None:
raise typer.BadParameter(
"--warehouse-size and --credits-per-hour are mutually exclusive."
)
cost_requested = warehouse_size is not None or credits_per_hour is not None
if not cost_requested and rate_per_credit != DEFAULT_RATE_PER_CREDIT_USD:
raise typer.BadParameter(
"--rate-per-credit requires --warehouse-size or --credits-per-hour."
)
if not cost_requested and no_minimum_billing:
raise typer.BadParameter(
"--no-minimum-billing requires --warehouse-size or --credits-per-hour."
)

try:
data = _load(
manifest=manifest,
Expand All @@ -199,11 +249,21 @@ def replay(
)
limit = top_idle_gaps if top_idle_gaps > 0 else 0
report = build_replay(data, top_idle_gaps_limit=limit)
cost_report = None
if cost_requested:
cost_inputs = cost_inputs_from_replay(
report,
warehouse_size=warehouse_size,
credits_per_hour=credits_per_hour,
rate_per_credit_usd=rate_per_credit,
apply_minimum_billing=not no_minimum_billing,
)
cost_report = compute_cost(cost_inputs)
except DbtDagOptError as exc:
typer.secho(f"error: {exc}", fg=typer.colors.RED, err=True)
raise typer.Exit(code=1) from exc

rendered = render_replay(report, fmt)
rendered = render_replay(report, fmt, cost=cost_report)

if output is not None:
output.write_text(rendered, encoding="utf-8")
Expand Down
179 changes: 179 additions & 0 deletions src/dbt_dag_opt/cost.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
"""Snowflake-style cost model layered on top of a ``ReplayReport``.

Translates wall-clock, thread-idleness, and the critical-path floor into
dollar amounts the user can reason about. Kept intentionally small and
primitive-driven so a future ``whatif`` simulator can call ``compute_cost``
twice (baseline + simulated) and diff the two ``CostReport``s without
having to fabricate a full replay.
"""

from __future__ import annotations

from dataclasses import dataclass

from dbt_dag_opt.errors import InvalidArtifactError
from dbt_dag_opt.replay import ReplayReport

SNOWFLAKE_CREDITS_PER_HOUR: dict[str, float] = {
"XS": 1.0,
"S": 2.0,
"M": 4.0,
"L": 8.0,
"XL": 16.0,
"2XL": 32.0,
"3XL": 64.0,
"4XL": 128.0,
"5XL": 256.0,
"6XL": 512.0,
}

DEFAULT_RATE_PER_CREDIT_USD: float = 2.0
MIN_BILLING_SECONDS: float = 60.0

_SIZE_ALIASES: dict[str, str] = {
"XSMALL": "XS",
"SMALL": "S",
"MEDIUM": "M",
"LARGE": "L",
"XLARGE": "XL",
"2XLARGE": "2XL",
"3XLARGE": "3XL",
"4XLARGE": "4XL",
"5XLARGE": "5XL",
"6XLARGE": "6XL",
}


@dataclass(frozen=True)
class CostInputs:
"""Primitive inputs to the cost model.

Decoupled from ``ReplayReport`` so ``whatif`` can pass simulated numbers
without going through the full parse pipeline.
"""

wall_clock_seconds: float
total_cpu_seconds: float
thread_count: int
critical_path_seconds: float
credits_per_hour: float
rate_per_credit_usd: float
apply_minimum_billing: bool = True
warehouse_size_label: str | None = None


@dataclass(frozen=True)
class CostReport:
inputs: CostInputs
billed_seconds: float
rate_per_second_usd: float
run_cost_usd: float
floor_cost_usd: float
headroom_usd: float
idle_thread_seconds: float
waste_fraction: float
idle_cost_usd: float
min_billing_applied: bool


def credits_per_hour_for(size: str) -> float:
"""Resolve a Snowflake warehouse size alias to credits/hour.

Case-insensitive; strips ``-`` and whitespace. Accepts both shorthand
(``"L"``, ``"2XL"``) and long form (``"Large"``, ``"2X-Large"``).
"""
normalized = size.upper().replace("-", "").replace(" ", "").replace("_", "")
if normalized in SNOWFLAKE_CREDITS_PER_HOUR:
return SNOWFLAKE_CREDITS_PER_HOUR[normalized]
if normalized in _SIZE_ALIASES:
return SNOWFLAKE_CREDITS_PER_HOUR[_SIZE_ALIASES[normalized]]
raise InvalidArtifactError(
f"unknown warehouse size: {size!r}. "
f"Expected one of {sorted(SNOWFLAKE_CREDITS_PER_HOUR)}."
)


def compute_cost(inputs: CostInputs) -> CostReport:
rate_per_second = inputs.credits_per_hour * inputs.rate_per_credit_usd / 3600.0

raw_wall = max(0.0, inputs.wall_clock_seconds)
if inputs.apply_minimum_billing and raw_wall < MIN_BILLING_SECONDS:
billed = MIN_BILLING_SECONDS
min_billing_applied = True
else:
billed = raw_wall
min_billing_applied = False

run_cost = billed * rate_per_second

raw_floor = max(0.0, inputs.critical_path_seconds)
if inputs.apply_minimum_billing:
floor_seconds = max(raw_floor, MIN_BILLING_SECONDS)
else:
floor_seconds = raw_floor
floor_cost = floor_seconds * rate_per_second

warehouse_seconds = raw_wall * max(1, inputs.thread_count)
idle_thread_seconds = max(0.0, warehouse_seconds - max(0.0, inputs.total_cpu_seconds))
waste_fraction = (
idle_thread_seconds / warehouse_seconds if warehouse_seconds > 0 else 0.0
)
idle_cost = run_cost * waste_fraction
headroom = max(0.0, run_cost - floor_cost)

return CostReport(
inputs=inputs,
billed_seconds=billed,
rate_per_second_usd=rate_per_second,
run_cost_usd=run_cost,
floor_cost_usd=floor_cost,
headroom_usd=headroom,
idle_thread_seconds=idle_thread_seconds,
waste_fraction=waste_fraction,
idle_cost_usd=idle_cost,
min_billing_applied=min_billing_applied,
)


def cost_inputs_from_replay(
report: ReplayReport,
*,
warehouse_size: str | None,
credits_per_hour: float | None,
rate_per_credit_usd: float,
apply_minimum_billing: bool,
) -> CostInputs:
"""Adapt a ``ReplayReport`` + user flags into ``CostInputs``.

Exactly one of ``warehouse_size`` / ``credits_per_hour`` must be set;
the caller (CLI) enforces this.
"""
if warehouse_size is not None and credits_per_hour is not None:
raise InvalidArtifactError(
"pass either warehouse_size or credits_per_hour, not both"
)
if warehouse_size is not None:
cph = credits_per_hour_for(warehouse_size)
label = warehouse_size.upper().replace("-", "").replace(" ", "").replace("_", "")
if label in _SIZE_ALIASES:
label = _SIZE_ALIASES[label]
elif credits_per_hour is not None:
if credits_per_hour <= 0:
raise InvalidArtifactError("credits_per_hour must be positive")
cph = float(credits_per_hour)
label = None
else:
raise InvalidArtifactError(
"cost inputs require warehouse_size or credits_per_hour"
)

return CostInputs(
wall_clock_seconds=report.wall_clock_seconds,
total_cpu_seconds=report.total_cpu_seconds,
thread_count=report.thread_count,
critical_path_seconds=report.critical_path_seconds,
credits_per_hour=cph,
rate_per_credit_usd=rate_per_credit_usd,
apply_minimum_billing=apply_minimum_billing,
warehouse_size_label=label,
)
Loading
Loading