Skip to content

Commit f39d9e1

Browse files
committed
feat(cli): cluv estimate and cluv history subcommands
cluv estimate <cluster> <job.sh> [-- program-args...] reproduces the spec_key and dry-runs the estimator, printing the historical records, P95, growth slope, confidence, and the SBATCH_MEM it would set. cluv history list|backfill|clear lets users inspect and manage the local cache that the estimator and the retry loop both feed.
1 parent 0245103 commit f39d9e1

3 files changed

Lines changed: 252 additions & 8 deletions

File tree

cluv/__main__.py

Lines changed: 85 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import rich_argparse
2020
import simple_parsing
2121

22+
from .cli.estimate import estimate
23+
from .cli.history import history
2224
from .cli.init import init
2325
from .cli.login import login
2426
from .cli.run import run
@@ -37,15 +39,22 @@ def main(argv: list[str] | None = None) -> None:
3739
argv = sys.argv[1:]
3840

3941
# argparse consumes '--' before REMAINDER sees it, so we extract program
40-
# args (everything after the first '--' following 'submit') before parsing.
42+
# args (everything after the first '--' following 'submit' or 'estimate')
43+
# before parsing.
4144
submit_program_args: list[str] = []
42-
try:
43-
sub_idx = argv.index("submit")
44-
sep_idx = argv.index("--", sub_idx + 1)
45-
submit_program_args = list(argv[sep_idx + 1 :])
46-
argv = list(argv[:sep_idx])
47-
except ValueError:
48-
pass
45+
estimate_program_args: list[str] = []
46+
for cmd_name, sink in (("submit", "submit"), ("estimate", "estimate")):
47+
try:
48+
sub_idx = argv.index(cmd_name)
49+
sep_idx = argv.index("--", sub_idx + 1)
50+
extracted = list(argv[sep_idx + 1 :])
51+
argv = list(argv[:sep_idx])
52+
if sink == "submit":
53+
submit_program_args = extracted
54+
else:
55+
estimate_program_args = extracted
56+
except ValueError:
57+
continue
4958

5059
parser = simple_parsing.ArgumentParser(
5160
description=__doc__,
@@ -75,6 +84,12 @@ def main(argv: list[str] | None = None) -> None:
7584
status_parser = add_status_args(subparsers)
7685
_add_v_arg(status_parser)
7786

87+
estimate_parser = add_estimate_args(subparsers)
88+
_add_v_arg(estimate_parser)
89+
90+
history_parser = add_history_args(subparsers)
91+
_add_v_arg(history_parser)
92+
7893
args = parser.parse_args(argv)
7994
args_dict = vars(args)
8095

@@ -85,6 +100,8 @@ def main(argv: list[str] | None = None) -> None:
85100

86101
if subcommand == "submit":
87102
args_dict["program_args"] = submit_program_args
103+
elif subcommand == "estimate":
104+
args_dict["program_args"] = estimate_program_args
88105

89106
try:
90107
if inspect.iscoroutinefunction(function):
@@ -136,6 +153,66 @@ def add_submit_args(
136153
return submit_parser
137154

138155

156+
def add_estimate_args(subparsers: Subparsers) -> argparse.ArgumentParser:
157+
estimate_parser = subparsers.add_parser(
158+
"estimate",
159+
help="Dry-run the memory estimator for a job script on a cluster.",
160+
formatter_class=rich_argparse.RichHelpFormatter,
161+
usage="cluv estimate <cluster> <job.sh> [-- program-args...]",
162+
)
163+
estimate_parser.add_argument(
164+
"cluster",
165+
metavar="<cluster>",
166+
help="The cluster whose history cache to read.",
167+
)
168+
estimate_parser.add_argument(
169+
"job_script",
170+
metavar="<job.sh>",
171+
help="Path to the sbatch job script (relative to project root).",
172+
)
173+
estimate_parser.add_argument(
174+
"--no-backfill",
175+
dest="backfill",
176+
action="store_false",
177+
help="Skip the sacct backfill on a cold cache.",
178+
)
179+
estimate_parser.set_defaults(func=estimate, backfill=True)
180+
return estimate_parser
181+
182+
183+
def add_history_args(subparsers: Subparsers) -> argparse.ArgumentParser:
184+
history_parser = subparsers.add_parser(
185+
"history",
186+
help="Inspect or manage the local sacct memory history cache.",
187+
formatter_class=rich_argparse.RichHelpFormatter,
188+
)
189+
history_parser.add_argument(
190+
"action",
191+
choices=("list", "backfill", "clear"),
192+
help="What to do with the cache.",
193+
)
194+
history_parser.add_argument(
195+
"cluster",
196+
nargs="?",
197+
default=None,
198+
metavar="<cluster>",
199+
help="Cluster name. Required for `backfill`. Optional filter for `list` and `clear`.",
200+
)
201+
history_parser.add_argument(
202+
"--key",
203+
default=None,
204+
help="Spec key to target with `clear` (requires <cluster>).",
205+
)
206+
history_parser.add_argument(
207+
"--since-days",
208+
type=int,
209+
default=60,
210+
help="How many days of sacct to pull for `backfill`. Default: 60.",
211+
)
212+
history_parser.set_defaults(func=history)
213+
return history_parser
214+
215+
139216
def add_status_args(subparsers: Subparsers) -> argparse.ArgumentParser:
140217
status_parser = subparsers.add_parser(
141218
"status",

cluv/cli/estimate.py

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
"""`cluv estimate`: dry-run the memory estimator for the current spec."""
2+
3+
from __future__ import annotations
4+
5+
import logging
6+
from pathlib import Path
7+
8+
from rich.table import Table
9+
10+
from cluv import history as history_module
11+
from cluv.cli.login import get_remote_without_2fa_prompt
12+
from cluv.cli.submit import ensure_clean_git_state
13+
from cluv.config import get_config
14+
from cluv.utils import console
15+
16+
logger = logging.getLogger(__name__)
17+
18+
__all__ = ["estimate"]
19+
20+
21+
async def estimate(
22+
cluster: str,
23+
job_script: Path,
24+
program_args: list[str],
25+
backfill: bool,
26+
) -> None:
27+
"""Show what `cluv submit` would predict for memory, without submitting anything."""
28+
from salvo.history import estimate_mem, spec_key
29+
30+
git_commit = ensure_clean_git_state()
31+
key = spec_key(str(job_script), git_commit, tuple(program_args))
32+
33+
cfg = get_config().estimate
34+
safety = cfg.safety if cfg else 1.2
35+
window = cfg.window if cfg else 20
36+
min_samples = cfg.min_samples if cfg else 3
37+
38+
console.print(f"spec key: [bold]{key}[/bold]")
39+
console.print(f"cluster: {cluster}")
40+
console.print(f"safety={safety} window={window} min_samples={min_samples}")
41+
42+
records = history_module.load(cluster, key)
43+
if not records and backfill:
44+
remote = await get_remote_without_2fa_prompt(cluster)
45+
if remote is None:
46+
console.print(
47+
f"[yellow]no active connection to {cluster}; skipping sacct backfill.[/yellow] "
48+
f"Run `cluv login {cluster}` first to enable backfill."
49+
)
50+
else:
51+
try:
52+
n = await history_module.backfill_from_sacct(remote, cluster)
53+
console.print(f"backfilled {n} record(s) from sacct on {cluster}")
54+
except Exception as err:
55+
console.print(f"[yellow]backfill failed: {err}[/yellow]")
56+
records = history_module.load(cluster, key)
57+
58+
if not records:
59+
console.print("[yellow]no records for this key; estimator would skip override.[/yellow]")
60+
return
61+
62+
table = Table(title=f"history ({len(records)} record(s), newest first)")
63+
table.add_column("job id")
64+
table.add_column("state")
65+
table.add_column("mem_mb", justify="right")
66+
table.add_column("max_rss_mb", justify="right")
67+
table.add_column("submitted_at")
68+
for r in records[:window]:
69+
table.add_row(
70+
r.job_id,
71+
r.state,
72+
str(r.mem_mb),
73+
"-" if r.max_rss_mb is None else str(r.max_rss_mb),
74+
r.submitted_at.isoformat(timespec="minutes"),
75+
)
76+
console.print(table)
77+
78+
est = estimate_mem(records, safety=safety, window=window, min_samples=min_samples)
79+
console.print(f"\n[bold]estimate:[/bold] {est.rationale}")
80+
console.print(f" confidence: {est.confidence}")
81+
console.print(f" n_samples: {est.n_samples}")
82+
if est.p95_mb is not None:
83+
console.print(f" p95_mb: {est.p95_mb}")
84+
if est.growth_slope_mb_per_run is not None:
85+
console.print(f" growth_slope_mb_per_run: {est.growth_slope_mb_per_run:.1f}")
86+
if est.mem_mb is None:
87+
console.print("[yellow]→ SBATCH_MEM would be left untouched.[/yellow]")
88+
else:
89+
console.print(f"[green]→ SBATCH_MEM would be set to {est.mem_mb}M.[/green]")

cluv/cli/history.py

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
"""`cluv history`: inspect and manage the local sacct-derived memory cache."""
2+
3+
from __future__ import annotations
4+
5+
import logging
6+
7+
from rich.table import Table
8+
9+
from cluv import history as history_module
10+
from cluv.cli.login import get_remote_without_2fa_prompt
11+
from cluv.utils import console
12+
13+
logger = logging.getLogger(__name__)
14+
15+
__all__ = ["history"]
16+
17+
18+
async def history(
19+
action: str,
20+
cluster: str | None,
21+
key: str | None,
22+
since_days: int,
23+
) -> None:
24+
"""Dispatch to `list`, `backfill`, or `clear` based on `action`."""
25+
if action == "list":
26+
_list(cluster)
27+
elif action == "backfill":
28+
if not cluster:
29+
console.print("[red]`cluv history backfill` requires a <cluster> argument.[/red]")
30+
return
31+
await _backfill(cluster, since_days)
32+
elif action == "clear":
33+
_clear(cluster, key)
34+
else:
35+
console.print(f"[red]unknown history action: {action}[/red]")
36+
37+
38+
def _list(cluster: str | None) -> None:
39+
rows = history_module.list_keys(cluster)
40+
if not rows:
41+
scope = f" for {cluster}" if cluster else ""
42+
console.print(f"[yellow]no records cached{scope}.[/yellow]")
43+
console.print(f"cache dir: {history_module.cache_dir()}")
44+
return
45+
table = Table(title=f"history cache ({history_module.cache_dir()})")
46+
table.add_column("cluster")
47+
table.add_column("key")
48+
table.add_column("records", justify="right")
49+
for c, k, n in rows:
50+
table.add_row(c, k, str(n))
51+
console.print(table)
52+
53+
54+
async def _backfill(cluster: str, since_days: int) -> None:
55+
remote = await get_remote_without_2fa_prompt(cluster)
56+
if remote is None:
57+
console.print(
58+
f"[red]no active SSH connection to {cluster}.[/red] "
59+
f"Run `cluv login {cluster}` first."
60+
)
61+
return
62+
n = await history_module.backfill_from_sacct(remote, cluster, since_days=since_days)
63+
console.print(f"backfilled {n} record(s) from sacct on {cluster} (last {since_days} days).")
64+
65+
66+
def _clear(cluster: str | None, key: str | None) -> None:
67+
if key and not cluster:
68+
console.print("[red]`--key` requires `<cluster>`.[/red]")
69+
return
70+
deleted = history_module.clear(cluster, key)
71+
scope = (
72+
f"({cluster}/{key})"
73+
if cluster and key
74+
else f"({cluster})"
75+
if cluster
76+
else "(all clusters)"
77+
)
78+
console.print(f"deleted {deleted} cache file(s) {scope}.")

0 commit comments

Comments
 (0)