[Low priority feat] Add rerun rate-limit error resume support#1351
[Low priority feat] Add rerun rate-limit error resume support#1351sgunasekar wants to merge 2 commits into
Conversation
Signed-off-by: suriya <sgunasekar@nvidia.com>
3f52982 to
a6559b6
Compare
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Path: .coderabbit.yaml Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (5)
🚧 Files skipped from review as they are similar to previous changes (3)
📝 WalkthroughWalkthroughA new Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~22 minutes 🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
nemo_skills/pipeline/utils/generation.py (1)
185-191:⚠️ Potential issue | 🟠 MajorReject
--rerun-doneand--rerun-rate-limit-errortogether.Line 190 returns before any selective rewrite happens, so passing both flags silently turns this into a full rerun and ignores the new option. Please fail fast here instead of accepting a user argument that never takes effect.
Suggested guard
def get_remaining_jobs(cluster_config, output_dir, random_seeds, chunk_ids, rerun_done, rerun_ratelimit_errors=False): + if rerun_done and rerun_ratelimit_errors: + raise ValueError("--rerun-done and --rerun-rate-limit-error are mutually exclusive") if rerun_done: return {seed: copy.deepcopy(chunk_ids) for seed in random_seeds}As per coding guidelines, "Avoid cases where user-passed parameters are unused; code should fail if user specifies an unsupported argument or if a required argument is missing. Use dataclass or **kwargs syntax to handle this automatically."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@nemo_skills/pipeline/utils/generation.py` around lines 185 - 191, The function get_remaining_jobs currently returns early when rerun_done is True, which silently ignores rerun_ratelimit_errors; update get_remaining_jobs to fail fast when both rerun_done and rerun_ratelimit_errors are True by checking "if rerun_done and rerun_ratelimit_errors:" near the top and raising a clear exception (e.g., ValueError) that explains the flags are mutually exclusive, so callers are informed instead of silently performing a full rerun; keep the existing behavior for single flags unchanged.nemo_skills/pipeline/eval.py (1)
399-420:⚠️ Potential issue | 🟠 MajorJudge reruns don't see the new flag.
This only forwards
rerun_ratelimit_errorsintoprepare_eval_commands(). The later judge paths still call_generate(...)/judge_step_fn(...)without it, sons eval --rerun-rate-limit-errorcan regenerate benchmark rows while reusing stale judge outputs and stale summary metrics.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@nemo_skills/pipeline/eval.py` around lines 399 - 420, The judge rerun flag rerun_ratelimit_errors is passed into prepare_eval_commands but not propagated into the later judge/generation paths, so calls to _generate(...) and judge_step_fn(...) must accept and be invoked with the rerun_ratelimit_errors parameter; update the signatures of _generate and judge_step_fn (and any intermediary functions invoked in the judge path) to add a rerun_ratelimit_errors parameter and pass the existing rerun_ratelimit_errors variable into those calls so that ns eval --rerun-rate-limit-error triggers regeneration rather than reusing stale judge outputs and summary metrics.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@nemo_skills/pipeline/utils/generation.py`:
- Around line 262-286: The selective-rerun branch is directly touching local
filesystem (Path.exists, unlink, open) and must use the cluster file abstraction
instead; update the rerun_ratelimit_errors handling so all checks/manipulations
of status_dir artifacts call through get_tunnel(...).run(...) (or the existing
tunnel API) rather than using Path/Path.unlink/open locally: when checking
merged_output_file existence use the tunnel to test the remote file, call
_rewrite_async_resume_file_if_needed via the tunnel or change it to accept/use a
tunnel so it rewrites the remote .jsonl-async correctly, and replace
done_file.unlink() with a remote remove via the tunnel (and append to
missing_jobs / remove from done_jobs afterward as now). Reference
functions/variables: rerun_ratelimit_errors block, get_chunked_rs_filename,
_rewrite_async_resume_file_if_needed, expected_files, done_jobs, missing_jobs,
and get_tunnel.
- Around line 347-379: The function _rewrite_async_resume_file_if_needed
currently opens async_output_path with "wt" which truncates the file when
source_path == async_output_path; instead compute preserved_rows and
rerunnable_found first (as you already do), then when writing, create a
temporary file in the same directory (e.g.,
async_output_path.with_suffix(".tmp") or similar), open that temp for writing
and write all preserved_rows to it, fsync if desired, then atomically replace
the original using Path.replace(temp_path, async_output_path); only unlink
output_path if not rewriting_existing_async after the atomic replace — this
ensures no in-place truncation of async_output_path and preserves resume state
on failures.
---
Outside diff comments:
In `@nemo_skills/pipeline/eval.py`:
- Around line 399-420: The judge rerun flag rerun_ratelimit_errors is passed
into prepare_eval_commands but not propagated into the later judge/generation
paths, so calls to _generate(...) and judge_step_fn(...) must accept and be
invoked with the rerun_ratelimit_errors parameter; update the signatures of
_generate and judge_step_fn (and any intermediary functions invoked in the judge
path) to add a rerun_ratelimit_errors parameter and pass the existing
rerun_ratelimit_errors variable into those calls so that ns eval
--rerun-rate-limit-error triggers regeneration rather than reusing stale judge
outputs and summary metrics.
In `@nemo_skills/pipeline/utils/generation.py`:
- Around line 185-191: The function get_remaining_jobs currently returns early
when rerun_done is True, which silently ignores rerun_ratelimit_errors; update
get_remaining_jobs to fail fast when both rerun_done and rerun_ratelimit_errors
are True by checking "if rerun_done and rerun_ratelimit_errors:" near the top
and raising a clear exception (e.g., ValueError) that explains the flags are
mutually exclusive, so callers are informed instead of silently performing a
full rerun; keep the existing behavior for single flags unchanged.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 54bb34f5-f2ba-4418-8e00-5a8c6a5a46b9
📒 Files selected for processing (5)
nemo_skills/pipeline/eval.pynemo_skills/pipeline/generate.pynemo_skills/pipeline/utils/eval.pynemo_skills/pipeline/utils/generation.pytests/test_generation.py
| if rerun_ratelimit_errors: | ||
| for seed in random_seeds: | ||
| for chunk_id in list(missing_jobs.get(seed, [])): | ||
| # for partially done seed/chunk_id combo rewrite the current -async file by simply dropping ratelimit error rows | ||
| output_file = get_chunked_rs_filename(status_dir, random_seed=seed, chunk_id=chunk_id) | ||
| _rewrite_async_resume_file_if_needed(output_file) | ||
|
|
||
| for seed in random_seeds: | ||
| for chunk_id in list(done_jobs[seed]): | ||
| # for fully done seed/chunk_id combo - of the chunks have been merged, there is no easy way to rerun just the ratelimit error rows so raise and ask user to --rerun-done | ||
| if chunk_id is not None and len(chunk_ids) > 1: | ||
| merged_output_file = get_chunked_rs_filename(status_dir, random_seed=seed, chunk_id=None) | ||
| if Path(merged_output_file).exists(): | ||
| raise ValueError( | ||
| "Cannot use --rerun-rate-limit-error for completed chunked outputs because " | ||
| f"`{merged_output_file}` has already been merged. Use --rerun-done to fully rerun the seed." | ||
| ) | ||
| # for fully done seed/chunk_id combo - if the chunks have not been merged, rewrite the .jsonl to .jsonl-async file by dropping ratelimit error rows and remove .done files | ||
| output_file = get_chunked_rs_filename(status_dir, random_seed=seed, chunk_id=chunk_id) | ||
| if _rewrite_async_resume_file_if_needed(output_file): | ||
| done_file = Path(expected_files[(seed, chunk_id)]) | ||
| if done_file.exists(): | ||
| done_file.unlink() | ||
| missing_jobs[seed].append(chunk_id) | ||
| done_jobs[seed].remove(chunk_id) |
There was a problem hiding this comment.
This selective-rerun branch bypasses the cluster file abstraction.
The normal status probing above goes through get_tunnel(...).run(...), but this path switches to local Path.exists(), open(), and unlink() against status_dir. On executor == "slurm", those artifacts are remote, so --rerun-rate-limit-error can miss merged outputs and leave the real *.done / *-async files untouched.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@nemo_skills/pipeline/utils/generation.py` around lines 262 - 286, The
selective-rerun branch is directly touching local filesystem (Path.exists,
unlink, open) and must use the cluster file abstraction instead; update the
rerun_ratelimit_errors handling so all checks/manipulations of status_dir
artifacts call through get_tunnel(...).run(...) (or the existing tunnel API)
rather than using Path/Path.unlink/open locally: when checking
merged_output_file existence use the tunnel to test the remote file, call
_rewrite_async_resume_file_if_needed via the tunnel or change it to accept/use a
tunnel so it rewrites the remote .jsonl-async correctly, and replace
done_file.unlink() with a remote remove via the tunnel (and append to
missing_jobs / remove from done_jobs afterward as now). Reference
functions/variables: rerun_ratelimit_errors block, get_chunked_rs_filename,
_rewrite_async_resume_file_if_needed, expected_files, done_jobs, missing_jobs,
and get_tunnel.
| def _rewrite_async_resume_file_if_needed(output_file: str, async_position_key: str = "_async_position") -> bool: | ||
| output_path = Path(output_file) | ||
| async_output_path = Path(f"{output_file}-async") | ||
| if output_path.exists(): | ||
| source_path = output_path | ||
| rewriting_existing_async = False | ||
| elif async_output_path.exists(): | ||
| source_path = async_output_path | ||
| rewriting_existing_async = True | ||
| else: | ||
| return False | ||
|
|
||
| preserved_rows = [] | ||
| rerunnable_found = False | ||
| with open(source_path, "rt", encoding="utf-8") as fin: | ||
| for idx, line in enumerate(fin): | ||
| row = json.loads(line) | ||
| if _is_rerunnable_ratelimit_error(row): | ||
| rerunnable_found = True | ||
| continue | ||
| if not rewriting_existing_async or async_position_key not in row: | ||
| row[async_position_key] = idx | ||
| preserved_rows.append(row) | ||
|
|
||
| if not rerunnable_found: | ||
| return False | ||
|
|
||
| with open(async_output_path, "wt", encoding="utf-8") as fout: | ||
| for row in preserved_rows: | ||
| fout.write(json.dumps(row) + "\n") | ||
| if not rewriting_existing_async: | ||
| output_path.unlink() | ||
| LOG.info( |
There was a problem hiding this comment.
Don't rewrite the only -async file in place.
When source_path == async_output_path, opening it with "wt" truncates the current resume state before the replacement is durable. Any write failure or interruption here loses the preserved-row metadata and makes the rerun unrecoverable; write to a temp file and replace() it atomically instead.
As per coding guidelines, "When adding new benchmarks, avoid data loss by doing all computation before re-opening files for writing; ensure computation completes before file writes to prevent accidental data loss if code fails."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@nemo_skills/pipeline/utils/generation.py` around lines 347 - 379, The
function _rewrite_async_resume_file_if_needed currently opens async_output_path
with "wt" which truncates the file when source_path == async_output_path;
instead compute preserved_rows and rerunnable_found first (as you already do),
then when writing, create a temporary file in the same directory (e.g.,
async_output_path.with_suffix(".tmp") or similar), open that temp for writing
and write all preserved_rows to it, fsync if desired, then atomically replace
the original using Path.replace(temp_path, async_output_path); only unlink
output_path if not rewriting_existing_async after the atomic replace — this
ensures no in-place truncation of async_output_path and preserves resume state
on failures.
Signed-off-by: suriya <sgunasekar@nvidia.com>
4abb16d to
4b10a3a
Compare
Add
--rerun-rate-limit-erroras optional flag in generate and eval pipelines.enable-soft-fail=true, the errored rows are skipped during reruns unless more aggressive--rerun-doneand++skip_filled=falseare used.--rerun-rate-limit-errorto rerun only the saved rate-limit soft failures.output[-rsX].jsonlfile has already been merged and there is no easy way to retrace the chunks and stage.Summary by CodeRabbit
--rerun-rate-limit-errorand--rerun-ratelimit-errorsCLI options to theevalandgeneratecommands, enabling retry of jobs that failed due to rate limit errors even when marked complete.