Skip to content

Commit 51ab842

Browse files
authored
Merge pull request #3 from trouze/feat/replay
add replay command: reconstruct the observed schedule
2 parents 24e95a6 + 96f04c8 commit 51ab842

10 files changed

Lines changed: 900 additions & 6 deletions

File tree

CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,14 @@
22

33
All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
44

5+
## [Unreleased]
6+
7+
### Added
8+
9+
- `dbt-dag-opt replay` subcommand: reconstructs the observed schedule from `run_results.json`'s `thread_id` + per-phase `timing` data, joined against `manifest.json`'s `parent_map`. Reports per-thread utilization, observed critical path (walked backwards from the last-completing node), and top idle gaps with parent-node attribution.
10+
- Output formats for `replay`: `text` (rich terminal summary, default) and `json` (full replay report, including raw events).
11+
- Integration fixture at `tests/fixtures/dbt_dugout/` — a real Snowflake dbt run (57 nodes, 4 threads) used to smoke-test `replay` end-to-end.
12+
513
## [0.1.0] - 2026-04-24
614

715
Initial PyPI release. Complete rewrite of the pre-release prototype.

README.md

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ Add `--run-id <id>` to pull artifacts from a specific historical run instead of
5353

5454
## CLI reference
5555

56+
### `analyze` — critical path through the DAG
57+
5658
```
5759
dbt-dag-opt analyze [OPTIONS]
5860
@@ -68,11 +70,31 @@ dbt-dag-opt analyze [OPTIONS]
6870
-o, --output PATH Write output to a file instead of stdout
6971
```
7072

73+
### `replay` — what actually happened
74+
75+
`analyze` is theoretical — it reports the DAG-structural lower bound on wall-clock. `replay` reads the *observed* schedule. Every result in `run_results.json` carries a `thread_id` and per-phase `timing` with start/end timestamps, so we can reconstruct:
76+
77+
- **Per-thread utilization** — how much of the run each worker was busy vs. idle.
78+
- **Observed critical path** — the chain of nodes that actually determined wall-clock, walked backwards from the last-completing node.
79+
- **Idle-gap attribution** — for every stretch of idle time, which upstream node's completion unblocked the thread. Gaps with no blocker are scheduler overhead, not DAG blocking.
80+
81+
```
82+
dbt-dag-opt replay [OPTIONS]
83+
84+
--manifest PATH Path to manifest.json (file mode)
85+
--run-results PATH Path to run_results.json (file mode)
86+
--account-id / --job-id dbt Cloud mode (same as analyze)
87+
-f, --format [text|json] Output format [default: text]
88+
--top-idle-gaps INTEGER How many idle gaps to surface [default: 10]
89+
-o, --output PATH Write output to a file instead of stdout
90+
```
91+
7192
### Output formats
7293

73-
- `table` — rich terminal table (default; what you want in a shell).
74-
- `json` — one object keyed by source: `{source_id: {path, distance, length}}`. Valid JSON, safe to pipe through `jq`.
75-
- `jsonl` — one JSON object per line. Nice for streaming into a log aggregator.
94+
- `table` — rich terminal table (default for `analyze`).
95+
- `text` — rich-rendered summary (default for `replay`): run summary, per-thread utilization, observed critical path, top idle gaps.
96+
- `json``analyze` emits `{source_id: {path, distance, length}}`; `replay` emits the full replay report. Both are `jq`-friendly.
97+
- `jsonl` — one JSON object per line (`analyze` only).
7698

7799
## How it works
78100

@@ -85,11 +107,11 @@ Distances sum the execution time of every node along the path — that's the war
85107

86108
## What this is / isn't
87109

88-
It **is** a CLI tool that points at the slowest chains in your DAG.
110+
It **is** a CLI tool that points at the slowest chains in your DAG and — as of `replay` — the observed schedule that those chains actually produced.
89111

90112
It **isn't** (yet):
91-
- A scheduler simulator. If your dbt `threads` setting is low, total wall-clock is bounded by parallelism *and* the critical path; v0.2 will surface both. For now, treat the critical-path distance as a lower bound.
92-
- A cost model. Multiplying distance × your warehouse rate is on you — a `--warehouse-size` flag is planned for v0.3.
113+
- A predictive scheduler simulator. `replay` reconstructs what already happened; it doesn't yet project what would happen under a different `--threads N` or if you sped up a specific model. That "what-if" loop is planned next.
114+
- A cost model. Multiplying wall-clock × your warehouse rate is on you — a `--warehouse-size` flag is planned alongside the what-if loop.
93115

94116
## Development
95117

src/dbt_dag_opt/cli.py

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
from dbt_dag_opt.graph import build_dag
1616
from dbt_dag_opt.longest_path import longest_paths_from_each_source
1717
from dbt_dag_opt.models import DagArtifacts
18+
from dbt_dag_opt.replay import build_replay
19+
from dbt_dag_opt.replay_formatters import ReplayFormat, render_replay
1820

1921
app = typer.Typer(
2022
name="dbt-dag-opt",
@@ -123,6 +125,86 @@ def analyze(
123125
sys.stdout.write("\n")
124126

125127

128+
@app.command("replay")
129+
def replay(
130+
manifest: Annotated[
131+
Path | None,
132+
typer.Option("--manifest", help="Path to manifest.json (file mode)."),
133+
] = None,
134+
run_results: Annotated[
135+
Path | None,
136+
typer.Option("--run-results", help="Path to run_results.json (file mode)."),
137+
] = None,
138+
account_id: Annotated[
139+
str | None,
140+
typer.Option("--account-id", help="dbt Cloud account id (cloud mode)."),
141+
] = None,
142+
job_id: Annotated[
143+
str | None,
144+
typer.Option("--job-id", help="dbt Cloud job id (cloud mode)."),
145+
] = None,
146+
run_id: Annotated[
147+
str | None,
148+
typer.Option(
149+
"--run-id",
150+
help="dbt Cloud run id (cloud mode). If omitted, uses the job's latest run.",
151+
),
152+
] = None,
153+
base_url: Annotated[
154+
str,
155+
typer.Option("--base-url", help="dbt Cloud base URL."),
156+
] = artifacts.DEFAULT_BASE_URL,
157+
token: Annotated[
158+
str | None,
159+
typer.Option(
160+
"--token",
161+
help="dbt Cloud API token. Prefer setting DBT_CLOUD_TOKEN in the environment.",
162+
envvar="DBT_CLOUD_TOKEN",
163+
),
164+
] = None,
165+
fmt: Annotated[
166+
ReplayFormat, typer.Option("--format", "-f", help="Output format.")
167+
] = ReplayFormat.TEXT,
168+
top_idle_gaps: Annotated[
169+
int,
170+
typer.Option(
171+
"--top-idle-gaps",
172+
help="Number of idle gaps to surface. Use 0 to suppress.",
173+
),
174+
] = 10,
175+
output: Annotated[
176+
Path | None,
177+
typer.Option("--output", "-o", help="Write output to a file instead of stdout."),
178+
] = None,
179+
) -> None:
180+
"""Replay an observed run: per-thread utilization, critical path, and idle-gap attribution."""
181+
try:
182+
data = _load(
183+
manifest=manifest,
184+
run_results=run_results,
185+
account_id=account_id,
186+
job_id=job_id,
187+
run_id=run_id,
188+
base_url=base_url,
189+
token=token,
190+
)
191+
limit = top_idle_gaps if top_idle_gaps > 0 else 0
192+
report = build_replay(data, top_idle_gaps_limit=limit)
193+
except DbtDagOptError as exc:
194+
typer.secho(f"error: {exc}", fg=typer.colors.RED, err=True)
195+
raise typer.Exit(code=1) from exc
196+
197+
rendered = render_replay(report, fmt)
198+
199+
if output is not None:
200+
output.write_text(rendered, encoding="utf-8")
201+
typer.echo(f"Wrote replay report to {output}")
202+
else:
203+
sys.stdout.write(rendered)
204+
if not rendered.endswith("\n"):
205+
sys.stdout.write("\n")
206+
207+
126208
def _load(
127209
*,
128210
manifest: Path | None,

0 commit comments

Comments
 (0)