Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ All notable changes to this project will be documented in this file. The format
- `dbt-dag-opt replay` subcommand: reconstructs the observed schedule from `run_results.json`'s `thread_id` + per-phase `timing` data, joined against `manifest.json`'s `parent_map`. Reports per-thread utilization, observed critical path (walked backwards from the last-completing node), and top idle gaps with parent-node attribution.
- Output formats for `replay`: `text` (rich terminal summary, default) and `json` (full replay report, including raw events).
- Integration fixture at `tests/fixtures/dbt_dugout/` — a real Snowflake dbt run (57 nodes, 4 threads) used to smoke-test `replay` end-to-end.
- `analyze --show-path`: render the full chain of node ids for each longest path in the table output.
- `analyze` table now includes a **Bottleneck** column naming the slowest model on each path. A bottleneck that appears across multiple rows is a shared-node optimization target.

## [0.1.0] - 2026-04-24

Expand Down
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,12 @@ dbt-dag-opt analyze [OPTIONS]
--token TEXT dbt Cloud API token [env: DBT_CLOUD_TOKEN]
-f, --format [json|jsonl|table] Output format [default: table]
-n, --top INTEGER Show only top N paths (0 = all) [default: 10]
--show-path Render the full chain of node ids (table format)
-o, --output PATH Write output to a file instead of stdout
```

The table includes a **Bottleneck** column that names the slowest model on each path. First-order optimization target: the bottleneck model on the #1 path. Watch for a bottleneck that repeats across multiple paths — that's shared-node leverage (optimizing one model helps several paths at once).

### `replay` — what actually happened

`analyze` is theoretical — it reports the DAG-structural lower bound on wall-clock. `replay` reads the *observed* schedule. Every result in `run_results.json` carries a `thread_id` and per-phase `timing` with start/end timestamps, so we can reconstruct:
Expand Down
11 changes: 10 additions & 1 deletion src/dbt_dag_opt/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,13 @@ def analyze(
int,
typer.Option("--top", "-n", help="Show only the top N longest paths. Use 0 for all."),
] = 10,
show_path: Annotated[
bool,
typer.Option(
"--show-path",
help="Render the full chain of node ids in the table (table format only).",
),
] = False,
output: Annotated[
Path | None,
typer.Option("--output", "-o", help="Write output to a file instead of stdout."),
Expand All @@ -114,7 +121,9 @@ def analyze(
raise typer.Exit(code=1) from exc

top_value: int | None = top if top > 0 else None
rendered = render(results, fmt, top=top_value)
rendered = render(
results, fmt, top=top_value, show_full_path=show_path, weights=dag.weights
)

if output is not None:
output.write_text(rendered, encoding="utf-8")
Expand Down
63 changes: 55 additions & 8 deletions src/dbt_dag_opt/formatters.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,14 @@ class Format(str, Enum):
TABLE = "table"


def render(results: list[PathResult], fmt: Format, *, top: int | None = None) -> str:
def render(
results: list[PathResult],
fmt: Format,
*,
top: int | None = None,
show_full_path: bool = False,
weights: dict[str, float] | None = None,
) -> str:
ordered = sorted(results, key=lambda r: r.distance, reverse=True)
if top is not None:
ordered = ordered[:top]
Expand All @@ -28,7 +35,7 @@ def render(results: list[PathResult], fmt: Format, *, top: int | None = None) ->
if fmt is Format.JSONL:
return _render_jsonl(ordered)
if fmt is Format.TABLE:
return _render_table(ordered)
return _render_table(ordered, show_full_path=show_full_path, weights=weights)
raise ValueError(f"unknown format: {fmt}")


Expand All @@ -50,19 +57,59 @@ def _render_jsonl(results: list[PathResult]) -> str:
return "\n".join(lines)


def _render_table(results: list[PathResult]) -> str:
def _render_table(
results: list[PathResult],
*,
show_full_path: bool = False,
weights: dict[str, float] | None = None,
) -> str:
buffer = StringIO()
console = Console(file=buffer, force_terminal=False, width=120)
table = Table(title="Longest paths by total execution time", show_lines=False)
console = Console(file=buffer, force_terminal=False, width=140)
table = Table(
title="Longest paths by total execution time",
show_lines=show_full_path, # row separators help when Path cell wraps
)
table.add_column("#", justify="right", style="dim")
table.add_column("Source", overflow="fold")
table.add_column("End of path", overflow="fold")
if show_full_path:
table.add_column("Path", overflow="fold")
else:
table.add_column("End of path", overflow="fold")
table.add_column("Length", justify="right")
table.add_column("Total time (s)", justify="right", style="bold")
if weights is not None:
table.add_column("Bottleneck (slowest on path)", overflow="fold")
table.add_column("Bottleneck time (s)", justify="right")

for idx, r in enumerate(results, start=1):
end = r.path[-1] if r.path else r.source
table.add_row(str(idx), r.source, end, str(len(r.path)), f"{r.distance:.2f}")
row: list[str] = [str(idx), r.source]
if show_full_path:
row.append(_format_path(r.path))
else:
row.append(r.path[-1] if r.path else r.source)
row.extend([str(len(r.path)), f"{r.distance:.2f}"])
if weights is not None:
node, seconds = _bottleneck(r.path, weights)
row.extend([node, f"{seconds:.2f}"])
table.add_row(*row)

console.print(table)
return buffer.getvalue()


def _format_path(path: list[str]) -> str:
return " → ".join(path) if path else "(empty)"


def _bottleneck(path: list[str], weights: dict[str, float]) -> tuple[str, float]:
"""Return (node_id, seconds) for the heaviest node along this path."""
if not path:
return ("-", 0.0)
best_node = path[0]
best_seconds = weights.get(best_node, 0.0)
for node in path[1:]:
seconds = weights.get(node, 0.0)
if seconds > best_seconds:
best_node = node
best_seconds = seconds
return (best_node, best_seconds)
25 changes: 25 additions & 0 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,31 @@ def test_analyze_from_files_table(
)
assert result.exit_code == 0, result.stdout
assert "source.demo.raw.orders" in result.stdout
# default table now includes the Bottleneck column (weights are passed by the CLI)
assert "Bottleneck" in result.stdout


def test_analyze_show_path_renders_full_chain(
tiny_manifest_path: Path, tiny_run_results_path: Path
) -> None:
result = runner.invoke(
app,
[
"analyze",
"--manifest",
str(tiny_manifest_path),
"--run-results",
str(tiny_run_results_path),
"--format",
"table",
"--show-path",
],
)
assert result.exit_code == 0, result.stdout
# fact_orders is the terminal model in tiny fixture; intermediate stg_orders sits in between
assert "stg_orders" in result.stdout
assert "fact_orders" in result.stdout
assert "→" in result.stdout


def test_analyze_output_file_writes_file(
Expand Down
23 changes: 23 additions & 0 deletions tests/test_formatters.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,26 @@ def test_render_table_contains_all_sources() -> None:
out = render(_RESULTS, Format.TABLE)
assert "src.a" in out
assert "30.00" in out


def test_render_table_show_full_path_renders_chain() -> None:
out = render(_RESULTS, Format.TABLE, show_full_path=True)
# full chain joined by arrows should appear verbatim for the longest path
assert "src.a → mid.a → end.a" in out


def test_render_table_weights_adds_bottleneck_column() -> None:
weights = {"src.a": 5.0, "mid.a": 20.0, "end.a": 5.0, "src.b": 4.0, "end.b": 6.0, "src.c": 1.0}
out = render(_RESULTS, Format.TABLE, weights=weights)
# Bottleneck of the top path is mid.a at 20s
assert "Bottleneck" in out
assert "mid.a" in out
assert "20.00" in out


def test_render_table_show_full_path_and_weights_together() -> None:
weights = {"src.a": 5.0, "mid.a": 20.0, "end.a": 5.0, "src.b": 4.0, "end.b": 6.0, "src.c": 1.0}
out = render(_RESULTS, Format.TABLE, show_full_path=True, weights=weights)
assert "src.a → mid.a → end.a" in out
assert "mid.a" in out
assert "20.00" in out
Loading