Skip to content

Commit 1f577fa

Browse files
authored
[Iris] Add CPU wall time endpoint (#5637)
This calculates the total CPU wall time for a given Iris job. It does this by summing the wall time for all leaf jobs (defined as not having a child job) for all descendant jobs of this job. ### Open Questions - @ravwojdyla this is what I was imagining when I filed #5495, but I realized this does not directly match any of the metrics you're returning in `collect_perf_metrics.py`. - I kept this simple to start, but we can add as many more summary stats as we want to this endpoint (like per-task breakdown, etc.). - I didn't add this to the UI because it didn't feel like there was resolution on that. - We could add this to the existing job stats endpoint, but since it's doing a bit more work and seem somewhat separate, I broke it out (Claude is great at boilerplate 😄). I don't feel remotely strongly about where it lives. Fixes #5495
1 parent 7d9856e commit 1f577fa

1 file changed

Lines changed: 133 additions & 0 deletions

File tree

scripts/datakit/collect_perf_metrics.py

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,59 @@
7474
"other",
7575
)
7676

77+
# Must match job_pb2.TASK_STATE_SUCCEEDED — kept as a literal to avoid
78+
# importing iris internals from this script.
79+
_TASK_STATE_SUCCEEDED = 4
80+
81+
# Wrapped in an outer SELECT so the query starts with SELECT (required by ExecuteRawQuery).
82+
_TASK_WALL_TIME_SQL = """\
83+
SELECT task_wall_ms FROM (
84+
WITH RECURSIVE descendants(job_id) AS (
85+
SELECT job_id FROM jobs WHERE job_id = '{job_id}'
86+
UNION ALL
87+
SELECT j.job_id FROM jobs j
88+
JOIN descendants d ON j.parent_job_id = d.job_id
89+
),
90+
leaves(job_id) AS (
91+
SELECT d.job_id FROM descendants d
92+
WHERE NOT EXISTS (SELECT 1 FROM jobs c WHERE c.parent_job_id = d.job_id)
93+
)
94+
SELECT SUM(ta.finished_at_ms - ta.started_at_ms) AS task_wall_ms
95+
FROM task_attempts ta
96+
JOIN tasks t ON t.task_id = ta.task_id
97+
JOIN leaves USING (job_id)
98+
WHERE ta.started_at_ms IS NOT NULL
99+
AND ta.finished_at_ms IS NOT NULL
100+
AND ta.finished_at_ms > ta.started_at_ms
101+
{state_filter}
102+
)"""
103+
104+
# Per-direct-child breakdown: the recursive CTE carries child_job_id (the
105+
# direct child of the root job each descendant belongs to) so we can group by it.
106+
_TASK_WALL_TIME_BY_CHILD_SQL = """\
107+
SELECT child_job_id, SUM(duration_ms) AS task_wall_ms FROM (
108+
WITH RECURSIVE descendants(job_id, child_job_id) AS (
109+
SELECT job_id, job_id AS child_job_id FROM jobs WHERE parent_job_id = '{job_id}'
110+
UNION ALL
111+
SELECT j.job_id, d.child_job_id FROM jobs j
112+
JOIN descendants d ON j.parent_job_id = d.job_id
113+
),
114+
leaves(job_id, child_job_id) AS (
115+
SELECT d.job_id, d.child_job_id FROM descendants d
116+
WHERE NOT EXISTS (SELECT 1 FROM jobs c WHERE c.parent_job_id = d.job_id)
117+
)
118+
SELECT ta.finished_at_ms - ta.started_at_ms AS duration_ms, leaves.child_job_id
119+
FROM task_attempts ta
120+
JOIN tasks t ON t.task_id = ta.task_id
121+
JOIN leaves USING (job_id)
122+
WHERE ta.started_at_ms IS NOT NULL
123+
AND ta.finished_at_ms IS NOT NULL
124+
AND ta.finished_at_ms > ta.started_at_ms
125+
{state_filter}
126+
)
127+
GROUP BY child_job_id
128+
ORDER BY child_job_id"""
129+
77130

78131
@dataclass
79132
class PerfReport:
@@ -84,6 +137,8 @@ class PerfReport:
84137
marin_prefix: str | None = None
85138
wall_seconds_total: float | None = None
86139
stage_wall_seconds: dict[str, float] = field(default_factory=dict)
140+
sum_task_wall_seconds_total: float | None = None
141+
stage_sum_task_wall_seconds: dict[str, float | None] = field(default_factory=dict)
87142
cached_steps: list[str] = field(default_factory=list)
88143
ooms: int = 0
89144
failed_shards: int = 0
@@ -184,6 +239,63 @@ def fetch_leaf_summaries(job_tree: list[dict], iris_config: Path) -> list[dict]:
184239
return summaries
185240

186241

242+
def fetch_raw_query_task_wall_ms(job_id: str, iris_config: Path, *, include_failed: bool = False) -> int | None:
243+
"""Sum per-attempt wall-clock durations across the subtree via ExecuteRawQuery."""
244+
state_filter = "" if include_failed else f"AND t.state = {_TASK_STATE_SUCCEEDED}"
245+
sql = _TASK_WALL_TIME_SQL.format(job_id=job_id.replace("'", "''"), state_filter=state_filter)
246+
result = _run_iris(["query", "--format=json", sql], iris_config)
247+
if result.returncode != 0:
248+
logger.warning("iris query task_wall_ms failed (exit %s): %s", result.returncode, result.stderr.strip())
249+
return None
250+
try:
251+
rows = json.loads(result.stdout)
252+
if not rows:
253+
return None
254+
val = rows[0]["task_wall_ms"]
255+
return int(val) if val is not None else 0
256+
except (json.JSONDecodeError, KeyError, ValueError) as exc:
257+
logger.warning("iris query returned unexpected output: %s", exc)
258+
return None
259+
260+
261+
def fetch_raw_query_task_wall_ms_by_child(
262+
job_id: str, iris_config: Path, *, include_failed: bool = False
263+
) -> dict[str, int] | None:
264+
"""Return per-direct-child task wall ms via ExecuteRawQuery, keyed by child job_id."""
265+
state_filter = "" if include_failed else f"AND t.state = {_TASK_STATE_SUCCEEDED}"
266+
sql = _TASK_WALL_TIME_BY_CHILD_SQL.format(job_id=job_id.replace("'", "''"), state_filter=state_filter)
267+
result = _run_iris(["query", "--format=json", sql], iris_config)
268+
if result.returncode != 0:
269+
logger.warning("iris query by_child failed (exit %s): %s", result.returncode, result.stderr.strip())
270+
return None
271+
try:
272+
rows = json.loads(result.stdout)
273+
return {row["child_job_id"]: int(row["task_wall_ms"]) for row in rows}
274+
except (json.JSONDecodeError, KeyError, ValueError) as exc:
275+
logger.warning("iris query by_child returned unexpected output: %s", exc)
276+
return None
277+
278+
279+
def bucket_by_step(by_child: dict[str, int], parent_id: str) -> dict[str, int | None]:
280+
"""Bucket per-child task_wall_ms into step names using the same prefix logic as compute_stage_wall_seconds.
281+
282+
All EXPECTED_STEPS are always present; steps with no matching child jobs have value None.
283+
"""
284+
parent_depth = _job_depth(parent_id)
285+
by_step: dict[str, int | None] = {step: None for step in EXPECTED_STEPS}
286+
for child_job_id, task_wall_ms in by_child.items():
287+
if not child_job_id.startswith(parent_id):
288+
continue
289+
if _job_depth(child_job_id) != parent_depth + 1:
290+
continue
291+
name = child_job_id.rsplit("/", 1)[-1]
292+
for prefix, step in _STEP_PREFIXES.items():
293+
if name.startswith(prefix):
294+
by_step[step] = (by_step.get(step) or 0) + task_wall_ms
295+
break
296+
return by_step
297+
298+
187299
def aggregate_per_task_metrics(summaries: list[dict]) -> tuple[int, dict[str, int], int, int]:
188300
"""Walk every task across all summaries and return cross-tree per-task metrics.
189301
@@ -507,6 +619,12 @@ def upload_report_to_gcs(report: PerfReport, gcs_prefix: str, report_name: str,
507619
default=None,
508620
help="If set, write the resulting GCS URL to this $GITHUB_OUTPUT key.",
509621
)
622+
@click.option(
623+
"--raw-query-cpu-time/--no-raw-query-cpu-time",
624+
"fetch_raw_query_cpu_time",
625+
default=True,
626+
help="Fetch CPU wall time via ExecuteRawQuery and include in the report.",
627+
)
510628
def main(
511629
job_id: str,
512630
iris_config: Path,
@@ -515,6 +633,7 @@ def main(
515633
out: Path | None,
516634
gcs_prefix: str | None,
517635
gcs_output_env: str | None,
636+
fetch_raw_query_cpu_time: bool,
518637
) -> None:
519638
"""Collect a perf report for a finished datakit ferry run.
520639
@@ -549,6 +668,20 @@ def main(
549668
workflow_env=workflow_env,
550669
)
551670

671+
if fetch_raw_query_cpu_time:
672+
task_wall_ms = fetch_raw_query_task_wall_ms(job_id, iris_config)
673+
if task_wall_ms is None:
674+
report.warnings.append("iris query task_wall_ms: failed; sum_task_wall_seconds_total unset")
675+
else:
676+
report.sum_task_wall_seconds_total = task_wall_ms / 1000.0
677+
by_child = fetch_raw_query_task_wall_ms_by_child(job_id, iris_config)
678+
if by_child is None:
679+
report.warnings.append("iris query by_child: failed; stage_sum_task_wall_seconds empty")
680+
else:
681+
report.stage_sum_task_wall_seconds = {
682+
step: ms / 1000.0 if ms is not None else None for step, ms in bucket_by_step(by_child, job_id).items()
683+
}
684+
552685
if out is not None:
553686
write_report_local(report, out)
554687
logger.info("Wrote perf report to %s", out)

0 commit comments

Comments
 (0)