-
Notifications
You must be signed in to change notification settings - Fork 461
Expand file tree
/
Copy pathserver.py
More file actions
501 lines (475 loc) · 18.1 KB
/
Copy pathserver.py
File metadata and controls
501 lines (475 loc) · 18.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""modelopt-mcp server entry point.
Stdio transport; codex / Claude Code launch this as a subprocess and
talk to it over stdin/stdout. See OMNIML-5123 for the design.
Phase 1 tool surface:
* list_examples — discover bundled launcher YAMLs
* verify_setup — fail-fast probe (docker or slurm)
* submit_job — submit a launcher YAML; mode by args
* job_status — filesystem-based status
* job_logs — filesystem-based logs
All tools return JSON-friendly dicts with explicit ``ok`` / ``reason`` /
``diagnostic`` fields so the calling LLM can route on structured
outcomes instead of free-form prose.
"""
from __future__ import annotations
import logging
import os
import sys
from typing import Annotated, Literal
from mcp.server.fastmcp import FastMCP
from pydantic import Field
from modelopt_mcp import bridge
logger = logging.getLogger("modelopt_mcp")
def _build_server() -> FastMCP:
"""Construct the MCP server with Phase 1 tools registered.
Factored out so tests can build an isolated instance without
stdio plumbing.
"""
mcp = FastMCP("modelopt")
@mcp.tool(
name="list_examples",
description=(
"List all bundled launcher YAML examples under "
"tools/launcher/examples/, with model + description "
"metadata extracted from each YAML. Use BEFORE submit_job "
"when you don't know which YAML to launch — this gives the "
"agent the discovery surface needed to pick one."
),
)
def list_examples() -> dict:
return bridge.list_examples_impl()
@mcp.tool(
name="verify_setup",
description=(
"Probe whether the named executor is reachable from THIS "
"host. Run BEFORE submit_job to fail fast — the actual "
"submission burns 30+ seconds (slurm) or starts a Docker "
"container (docker) before discovering setup is broken.\n\n"
"Docker mode: checks `docker info` (daemon up) + GPU "
"passthrough (`docker run --gpus all nvidia-smi`). Set "
"MODELOPT_MCP_SKIP_GPU_CHECK=1 in the env to skip the GPU "
"check on CPU-only hosts.\n\n"
"Slurm mode: ssh -o BatchMode=yes -o ConnectTimeout=5 "
"to the cluster login node. Refuses to prompt for "
"password, so key-auth failure is detected immediately."
),
)
def verify_setup(
executor: Annotated[
Literal["docker", "slurm"],
Field(
description=(
"Which executor to probe: 'docker' for local GPU or 'slurm' for remote cluster."
)
),
],
cluster_host: Annotated[
str | None,
Field(description=("Slurm cluster login hostname. Required when executor='slurm'.")),
] = None,
cluster_user: Annotated[
str | None, Field(description=("SSH user for the cluster. None uses ssh's default."))
] = None,
identity: Annotated[
str | None,
Field(
description=("SSH identity file (-i) override. None uses default key / ssh-agent.")
),
] = None,
) -> dict:
if executor == "docker":
return bridge.verify_docker_setup_impl()
if executor == "slurm":
if not cluster_host:
return {
"ok": False,
"executor": "slurm",
"reason": "missing_cluster_host",
"diagnostic": ("executor='slurm' requires cluster_host=<hostname>."),
}
return bridge.verify_slurm_setup_impl(
cluster_host=cluster_host,
cluster_user=cluster_user,
identity=identity,
)
# Pydantic Literal already constrains; this is a defensive fallback.
return {"ok": False, "reason": "unknown_executor"}
@mcp.tool(
name="submit_job",
description=(
"Submit a ModelOpt launcher YAML for execution. Mode is "
"determined by mutually-exclusive args:\n"
" - hf_local=<path> → Docker (local GPU)\n"
" - cluster_host=<host> → Slurm (remote SSH)\n\n"
"Returns the experiment_id (Slurm) or PID (Docker, "
"experiment_id captured in Phase 2) immediately; the actual "
"job runs detached. Poll status via job_status, fetch "
"output via job_logs.\n\n"
"Auto-verifies the executor first by default (skip_verify="
"False is recommended unless you just called verify_setup)."
),
)
def submit_job(
yaml_path: Annotated[
str,
Field(
description=(
"Launcher YAML to submit. Pass an absolute path, a path "
"relative to tools/launcher/examples/, or one of the paths "
"returned by list_examples."
)
),
],
hf_local: Annotated[
str | None,
Field(
description=(
"Local HF cache directory — when set, dispatches via "
"Docker. Mutually exclusive with cluster_host."
)
),
] = None,
cluster_host: Annotated[
str | None,
Field(
description=(
"Slurm cluster login hostname — when set, dispatches via "
"remote SSH. Mutually exclusive with hf_local."
)
),
] = None,
cluster_user: Annotated[
str | None,
Field(description=("SSH user for the cluster. None uses launcher's default.")),
] = None,
identity: Annotated[
str | None,
Field(description=("SSH identity file (-i). None uses ssh-agent / default key.")),
] = None,
job_dir: Annotated[
str | None,
Field(
description=(
"Override the experiment output directory. None uses the "
"launcher's per-mode default."
)
),
] = None,
job_name: Annotated[
str | None,
Field(description=("Override the job_name in the YAML. None uses the YAML's default.")),
] = None,
extra_overrides: Annotated[
dict[str, str] | None,
Field(
description=(
"Additional nemo-run-style overrides as a flat dict, e.g. "
"{'task.slurm_config.nodes': '2'}."
)
),
] = None,
skip_verify: Annotated[
bool,
Field(
description=(
"If True, skip the verify_setup probe before submission. "
"Default False — the probe takes ~1s and saves you from "
"30+s of wasted submission time on bad config."
)
),
] = False,
dry_run: Annotated[
bool,
Field(
description=(
"If True, run `launch.py --dryrun --yes` to validate that "
"the YAML parses, the factory resolves, and any "
"referenced files exist — without contacting the "
"cluster, spawning a container, or running sbatch. "
"Used by verify-task workflow stages (deployment_support, "
"hidden_state_dump_support, mlm_eval, ...) that only "
"need to confirm a YAML compiles. Returns "
"`{ok, dry_run: True, validated: bool, diagnostic?: str, "
"exit_code: int|None, stdout_tail: str, stderr_tail: str, "
"argv: list[str]}` with no `experiment_id`. Skips "
"verify_setup automatically — "
"no cluster contact happens in dry-run. `hf_local` / "
"`cluster_host` are optional in this mode (pass one to "
"validate executor-specific config, omit both to validate "
"just the YAML shape)."
)
),
] = False,
) -> dict:
return bridge.submit_job_impl(
yaml_path=yaml_path,
hf_local=hf_local,
cluster_host=cluster_host,
cluster_user=cluster_user,
identity=identity,
job_dir=job_dir,
job_name=job_name,
extra_overrides=extra_overrides,
skip_verify=skip_verify,
dry_run=dry_run,
)
@mcp.tool(
name="job_status",
description=(
"Read filesystem-based status from a nemo_run experiment "
"dir. Returns 'done' / 'failed' / 'running' based on "
"presence of _DONE and contents of status_*.out files. "
"Per-task statuses also surfaced for multi-task pipelines."
),
)
def job_status(
experiment_id: Annotated[
str,
Field(
description=(
"The experiment id returned by submit_job (Slurm) or the "
"name nemo_run assigned to the experiment dir."
)
),
],
) -> dict:
return bridge.job_status_impl(experiment_id)
@mcp.tool(
name="job_logs",
description=(
"Read log_<task>.out files from the experiment dir. If "
"task=None, returns logs for all tasks. If tail=N, returns "
"only the last N lines per task."
),
)
def job_logs(
experiment_id: Annotated[str, Field(description=("The experiment id."))],
task: Annotated[
str | None,
Field(description=("Specific task name to filter logs by. None returns all.")),
] = None,
tail: Annotated[
int | None,
Field(
ge=1,
description=(
"Return only the last N lines per task. Must be >= 1 when set; "
"None returns the full log."
),
),
] = None,
) -> dict:
return bridge.job_logs_impl(experiment_id, task, tail)
@mcp.tool(
name="wait_for_experiment",
description=(
"Block until an experiment reaches a terminal status "
"('done' or 'failed') or the timeout elapses. Returns the "
"same shape as job_status plus a `waited_seconds` field. "
"Use this instead of writing your own polling while-loop "
"around job_status."
),
)
def wait_for_experiment(
experiment_id: Annotated[
str,
Field(description="The experiment id from submit_job."),
],
timeout_sec: Annotated[
int,
Field(
ge=1,
description=(
"Max seconds to wait before returning "
"`{ok: False, reason: 'wait_timeout'}`. Default 7200 "
"(2 hours) — large PTQ runs need this. Cap at your "
"agent's own deadline."
),
),
] = 7200,
poll_interval_sec: Annotated[
int,
Field(
ge=1,
description=(
"Seconds between status polls. Default 30 — "
"filesystem-based status is cheap so the poll "
"doesn't have to be slow."
),
),
] = 30,
) -> dict:
return bridge.wait_for_experiment_impl(
experiment_id,
timeout_sec,
poll_interval_sec,
)
@mcp.tool(
name="provision_passwordless_ssh_dry_run",
description=(
"Emit the exact commands the operator should run to set up "
"passwordless SSH to a slurm cluster. Does NOT execute "
"them and does NOT handle passwords — the MCP wire is "
"unsafe for cluster credentials. Two-phase:\n\n"
"1. If the SSH private key is missing → emit "
"`ssh-keygen` command. Operator runs it, re-invokes this "
"tool.\n"
"2. If the key exists → emit `ssh-copy-id` command + "
"public-key content. Operator runs it (this is the only "
"step that needs a cluster password, prompted by ssh).\n\n"
"After both steps complete, the `next_check` field "
"recommends calling `verify_setup(executor='slurm', ...)` "
"to confirm key-auth now works. Use this when "
"`verify_setup` returns `ssh_auth_failed` and you want to "
"tell the operator how to fix it."
),
)
def provision_passwordless_ssh_dry_run(
cluster_host: Annotated[
str,
Field(description="Slurm cluster login hostname."),
],
cluster_user: Annotated[
str | None,
Field(
description=("SSH user for the cluster. None uses the local user."),
),
] = None,
identity: Annotated[
str | None,
Field(
description=(
"Path to the SSH private key to inspect. None "
"uses $IDENTITY env var, then ~/.ssh/id_ed25519."
),
),
] = None,
) -> dict:
return bridge.provision_passwordless_ssh_dry_run_impl(
cluster_host,
cluster_user,
identity,
)
@mcp.tool(
name="read_cluster_artifact",
description=(
"Read an artifact from a remote experiment via nemo_run's "
"tunnel. nemo_run already knows the cluster host + user + "
"identity from the executor metadata stored alongside the "
"experiment — this tool does NOT take cluster credentials.\n\n"
"Two modes:\n"
"* `path=None, job_idx=N` → fetch the job's log via "
"`nemo experiment logs <id> <N>`.\n"
"* `path='<rel>'` → read the named relative path inside "
"the remote experiment dir via the executor's tunnel.\n\n"
"Returns the file content truncated to 8 KB (same cap as "
"the launcher's `log_excerpt`). Use this for files like "
"`specbench_results.json` that the launcher writes on the "
"cluster's lustre but the MCP server (running locally) can't "
"directly read."
),
)
def read_cluster_artifact(
experiment_id: Annotated[
str,
Field(description="The experiment id from submit_job."),
],
path: Annotated[
str | None,
Field(
description=(
"Relative path inside the experiment dir. None = "
"log-fetch mode via `nemo experiment logs`."
),
),
] = None,
job_idx: Annotated[
int,
Field(
ge=0,
description=(
"Job index for log-fetch mode. Default 0 = the first task in the pipeline."
),
),
] = 0,
) -> dict:
return bridge.read_cluster_artifact_impl(experiment_id, path, job_idx)
@mcp.tool(
name="open_draft_pr",
description=(
"Push the agent's current branch + open a draft PR on the "
"named target repo. Preconditions enforced by the caller "
"(NOT this tool): the agent's working tree is at the branch "
"it wants to PR, commits exist, and any required DCO "
"`Signed-off-by:` trailer is in place.\n\n"
"Steps internally: `git push -u origin HEAD` + "
"`gh pr create --draft --repo <target> --title --body --base`. "
"Returns `pr_url` parsed from gh's stdout on success, or "
"structured `reason` on failure (git_push_failed, "
"gh_pr_create_failed, etc.)."
),
)
def open_draft_pr(
target_repo: Annotated[
str,
Field(
description=("GitHub repo slug, e.g. 'NVIDIA/Model-Optimizer'."),
),
],
title: Annotated[
str,
Field(description="PR title."),
],
body: Annotated[
str,
Field(description="PR description body (Markdown)."),
],
base_branch: Annotated[
str,
Field(
description=("Base branch to PR against. Default 'main'."),
),
] = "main",
cwd: Annotated[
str | None,
Field(
description=(
"Path to the git checkout. None uses the MCP "
"server's cwd (usually the agent's working dir)."
),
),
] = None,
) -> dict:
return bridge.open_draft_pr_impl(
target_repo,
title,
body,
base_branch,
cwd,
)
return mcp
def main() -> None:
"""Entry point for the `modelopt-mcp` console_script."""
logging.basicConfig(
stream=sys.stderr,
level=os.environ.get("MODELOPT_MCP_LOG", "INFO"),
format="%(asctime)s %(name)s %(levelname)s %(message)s",
)
mcp = _build_server()
mcp.run() # stdio by default
if __name__ == "__main__":
main()