Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@

All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

### Added

- `dbt-dag-opt replay` subcommand: reconstructs the observed schedule from `run_results.json`'s `thread_id` + per-phase `timing` data, joined against `manifest.json`'s `parent_map`. Reports per-thread utilization, observed critical path (walked backwards from the last-completing node), and top idle gaps with parent-node attribution.
- Output formats for `replay`: `text` (rich terminal summary, default) and `json` (full replay report, including raw events).
- Integration fixture at `tests/fixtures/dbt_dugout/` — a real Snowflake dbt run (57 nodes, 4 threads) used to smoke-test `replay` end-to-end.

## [0.1.0] - 2026-04-24

Initial PyPI release. Complete rewrite of the pre-release prototype.
Expand Down
34 changes: 28 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ Add `--run-id <id>` to pull artifacts from a specific historical run instead of

## CLI reference

### `analyze` — critical path through the DAG

```
dbt-dag-opt analyze [OPTIONS]

Expand All @@ -68,11 +70,31 @@ dbt-dag-opt analyze [OPTIONS]
-o, --output PATH Write output to a file instead of stdout
```

### `replay` — what actually happened

`analyze` is theoretical — it reports the DAG-structural lower bound on wall-clock. `replay` reads the *observed* schedule. Every result in `run_results.json` carries a `thread_id` and per-phase `timing` with start/end timestamps, so we can reconstruct:

- **Per-thread utilization** — how much of the run each worker was busy vs. idle.
- **Observed critical path** — the chain of nodes that actually determined wall-clock, walked backwards from the last-completing node.
- **Idle-gap attribution** — for every stretch of idle time, which upstream node's completion unblocked the thread. Gaps with no blocker are scheduler overhead, not DAG blocking.

```
dbt-dag-opt replay [OPTIONS]

--manifest PATH Path to manifest.json (file mode)
--run-results PATH Path to run_results.json (file mode)
--account-id / --job-id dbt Cloud mode (same as analyze)
-f, --format [text|json] Output format [default: text]
--top-idle-gaps INTEGER How many idle gaps to surface [default: 10]
-o, --output PATH Write output to a file instead of stdout
```

### Output formats

- `table` — rich terminal table (default; what you want in a shell).
- `json` — one object keyed by source: `{source_id: {path, distance, length}}`. Valid JSON, safe to pipe through `jq`.
- `jsonl` — one JSON object per line. Nice for streaming into a log aggregator.
- `table` — rich terminal table (default for `analyze`).
- `text` — rich-rendered summary (default for `replay`): run summary, per-thread utilization, observed critical path, top idle gaps.
- `json` — `analyze` emits `{source_id: {path, distance, length}}`; `replay` emits the full replay report. Both are `jq`-friendly.
- `jsonl` — one JSON object per line (`analyze` only).

## How it works

Expand All @@ -85,11 +107,11 @@ Distances sum the execution time of every node along the path — that's the war

## What this is / isn't

It **is** a CLI tool that points at the slowest chains in your DAG.
It **is** a CLI tool that points at the slowest chains in your DAG and — as of `replay` — the observed schedule that those chains actually produced.

It **isn't** (yet):
- A scheduler simulator. If your dbt `threads` setting is low, total wall-clock is bounded by parallelism *and* the critical path; v0.2 will surface both. For now, treat the critical-path distance as a lower bound.
- A cost model. Multiplying distance × your warehouse rate is on you — a `--warehouse-size` flag is planned for v0.3.
- A predictive scheduler simulator. `replay` reconstructs what already happened; it doesn't yet project what would happen under a different `--threads N` or if you sped up a specific model. That "what-if" loop is planned next.
- A cost model. Multiplying wall-clock × your warehouse rate is on you — a `--warehouse-size` flag is planned alongside the what-if loop.

## Development

Expand Down
82 changes: 82 additions & 0 deletions src/dbt_dag_opt/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
from dbt_dag_opt.graph import build_dag
from dbt_dag_opt.longest_path import longest_paths_from_each_source
from dbt_dag_opt.models import DagArtifacts
from dbt_dag_opt.replay import build_replay
from dbt_dag_opt.replay_formatters import ReplayFormat, render_replay

app = typer.Typer(
name="dbt-dag-opt",
Expand Down Expand Up @@ -123,6 +125,86 @@ def analyze(
sys.stdout.write("\n")


@app.command("replay")
def replay(
manifest: Annotated[
Path | None,
typer.Option("--manifest", help="Path to manifest.json (file mode)."),
] = None,
run_results: Annotated[
Path | None,
typer.Option("--run-results", help="Path to run_results.json (file mode)."),
] = None,
account_id: Annotated[
str | None,
typer.Option("--account-id", help="dbt Cloud account id (cloud mode)."),
] = None,
job_id: Annotated[
str | None,
typer.Option("--job-id", help="dbt Cloud job id (cloud mode)."),
] = None,
run_id: Annotated[
str | None,
typer.Option(
"--run-id",
help="dbt Cloud run id (cloud mode). If omitted, uses the job's latest run.",
),
] = None,
base_url: Annotated[
str,
typer.Option("--base-url", help="dbt Cloud base URL."),
] = artifacts.DEFAULT_BASE_URL,
token: Annotated[
str | None,
typer.Option(
"--token",
help="dbt Cloud API token. Prefer setting DBT_CLOUD_TOKEN in the environment.",
envvar="DBT_CLOUD_TOKEN",
),
] = None,
fmt: Annotated[
ReplayFormat, typer.Option("--format", "-f", help="Output format.")
] = ReplayFormat.TEXT,
top_idle_gaps: Annotated[
int,
typer.Option(
"--top-idle-gaps",
help="Number of idle gaps to surface. Use 0 to suppress.",
),
] = 10,
output: Annotated[
Path | None,
typer.Option("--output", "-o", help="Write output to a file instead of stdout."),
] = None,
) -> None:
"""Replay an observed run: per-thread utilization, critical path, and idle-gap attribution."""
try:
data = _load(
manifest=manifest,
run_results=run_results,
account_id=account_id,
job_id=job_id,
run_id=run_id,
base_url=base_url,
token=token,
)
limit = top_idle_gaps if top_idle_gaps > 0 else 0
report = build_replay(data, top_idle_gaps_limit=limit)
except DbtDagOptError as exc:
typer.secho(f"error: {exc}", fg=typer.colors.RED, err=True)
raise typer.Exit(code=1) from exc

rendered = render_replay(report, fmt)

if output is not None:
output.write_text(rendered, encoding="utf-8")
typer.echo(f"Wrote replay report to {output}")
else:
sys.stdout.write(rendered)
if not rendered.endswith("\n"):
sys.stdout.write("\n")


def _load(
*,
manifest: Path | None,
Expand Down
Loading
Loading