Skip to content

feat: run eval, finetune via background jobs#1436

Draft
leonardmq wants to merge 20 commits into
leonard/kil-686-feat-background-job-systemfrom
leonard/kil-686-run-eval-via-job
Draft

feat: run eval, finetune via background jobs#1436
leonardmq wants to merge 20 commits into
leonard/kil-686-feat-background-job-systemfrom
leonard/kil-686-run-eval-via-job

Conversation

@leonardmq
Copy link
Copy Markdown
Collaborator

Summary

Routes the specs-page Run Eval flow through the new background job system, with a special pause-on-disconnect behavior scoped entirely to this endpoint (not baked into the job system itself).

  • One eval job per run-config: per-row Run Eval → 1 tracked job; "Run All Evals" → N jobs (one per run-config). The new SSE endpoint creates the jobs and aggregates their progress back into the legacy {progress, total, errors} + complete SSE shape so run_eval.svelte's counter dialog still works unchanged — only the URL is repointed.
  • Pause-on-disconnect: on client disconnect the endpoint fires a single detached cleanup task that cancels PENDING jobs first (so they leave the queue before any slot frees), then pauses RUNNING ones. Race-free under a small concurrency cap where Run-All exceeds it. On normal completion (data: complete sent) the cleanup is skipped.
  • Approval-gated (agent_policy_require_approval("Run eval comparison?")) to match the legacy run_comparison — it spends compute/credits.
  • No changes to EvalJob/EvalJobParams or run_comparison: new module app/desktop/studio_server/eval_jobs_api.py, new endpoint GET .../eval_config/{eval_config_id}/run_comparison_jobs?... mirroring run_comparison's params. Jobs are created via job_registry.create("eval", {dict}, ...) to avoid a circular import.

Bonus fix — shared iter_with_keepalive helper

Found and fixed a latent SSE bug with the same root cause in the existing /api/jobs/events stream: asyncio.wait_for(subscription.__anext__(), timeout) cancels the bus async-generator on each timeout, terminating its finally (unsubscribe) and ending the stream after one ping. For the eval stream that would have paused/cancelled live jobs on a still-connected client during any quiet window. Fixed by extracting iter_with_keepalive(subscription, timeout) into jobs/events.py — a feeder-task pattern where the keepalive wait_for operates on a local queue, never on the subscription itself. Used by both the eval-jobs stream and the existing /api/jobs/events stream.

Test plan

  • uv run ./checks.sh --agent-mode — passes
  • Backend: N-jobs-per-run-config + aggregate stream + complete; disconnect cancels pending + pauses running (with max_concurrent < N — the load-bearing case); deleted-job-unblocks-stream; keepalive does not tear down the subscription (real JobEventBus + monkeypatched short keepalive); normal-completion-doesn't-pause; empty-ids → 400; all_run_configs resolution
  • Frontend: run_eval.svelte builds the new URL for both explicit-ids and run-all
  • make annotations regenerated → new endpoint's annotation is approval-gated
  • Manual UI: click Run Eval (per-row + Run-All) — counter updates and shows complete; closing the page mid-run pauses RUNNING jobs and cancels PENDING ones; both show in the project-scoped jobs widget

🤖 Generated with Claude Code

Route the specs-page Run Eval flow through the background job system.
One eval job per run-config (Run All -> N jobs), aggregated into the
legacy SSE counter format. On client disconnect, pause running jobs
and cancel pending ones — kept entirely in the new endpoint, not in
the job system. Approval-gated to match the legacy run_comparison.

Also extracts a shared iter_with_keepalive helper so a quiet-window
keepalive can no longer cancel (and thus tear down) the underlying
bus subscription. Used by both the new eval-jobs SSE stream and the
existing /api/jobs/events stream, fixing a latent same-root-cause
reconnect-churn bug in the jobs widget too.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 29, 2026

Review Change Stack

Walkthrough

This PR adds a new background job-based API for running eval config comparisons with Server-Sent Event (SSE) streaming. A shared keepalive mechanism prevents SSE timeout during quiet periods. The implementation includes disconnect handling (canceling pending, pausing running jobs), comprehensive test coverage, frontend integration, and agent policy configuration.

Changes

Background job-based eval comparison with keepalive SSE streaming

Layer / File(s) Summary
Keepalive infrastructure for SSE streams
app/desktop/studio_server/jobs/events.py, app/desktop/studio_server/jobs/api.py
New KeepalivePing sentinel and iter_with_keepalive() helper yield periodic pings to prevent SSE timeout during quiet periods. Jobs API updated to use the helper instead of manual polling with asyncio.wait_for.
Eval jobs API with aggregation and lifecycle management
app/desktop/studio_server/eval_jobs_api.py, app/desktop/desktop_server.py
New /run_comparison_jobs endpoint subscribes to job events, maintains tracked job state, emits aggregated progress payloads, and handles client disconnects by canceling pending and pausing running jobs. Desktop server wires the API during app initialization.
Eval jobs API test suite with stubs and fixtures
app/desktop/studio_server/test_eval_jobs_api.py
Stub Pydantic models and deterministic pausable eval worker; isolated JobRegistry fixture; SSE parsing and status polling helpers; nine test cases covering job creation, input validation, SSE semantics, disconnect behaviors, event ordering robustness, and keepalive regression prevention.
Existing jobs API tests adapted to keepalive mechanism
app/desktop/studio_server/jobs/test_api.py
Reworked SSE tests to directly consume the _event_stream async generator, skipping httpx.ASGITransport streaming. New _read_sse_event_from_gen helper skips keepalive pings; four test functions updated for generator-based approach.
Frontend: component URL update and test coverage
app/web_ui/src/lib/components/run_eval.svelte, app/web_ui/src/lib/components/run_eval.test.ts
Component updated to use /run_comparison_jobs endpoint. Vitest tests stub EventSource and posthog-js, polyfill <dialog> methods, and verify both explicit run_config_ids and run_all query parameter scenarios.
API schema, types, and agent permissions
app/web_ui/src/lib/api_schema.d.ts, libs/server/kiln_server/utils/agent_checks/annotations/get_api_projects_project_id_tasks_task_id_evals_eval_id_eval_config_eval_config_id_run_comparison_jobs.json
Auto-generated TypeScript API schema for new endpoint including path/query parameters and response shapes. Agent policy config defines approval requirement ("Run eval comparison?").

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

  • Kiln-AI/Kiln#395: Modifies the same run_eval.svelte component URL construction for eval runs; may provide context on prior endpoint changes.

Suggested reviewers

  • scosman
  • chiang-daniel
  • tawnymanticore

Poem

🐰 A rabbit bounced through the code so clean,
Adding jobs that stream, serene and keen,
With keepalive pings to dodge the freeze,
And cleanup on disconnect with such ease,
Now evals compare in the background's breeze!

🚥 Pre-merge checks | ✅ 3 | ❌ 2

❌ Failed checks (1 warning, 1 inconclusive)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 21.28% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Description check ❓ Inconclusive The PR description is comprehensive and well-structured, covering objectives, implementation details, and test plan; however, it does not include the required CLA confirmation or completed checklists. Add CLA confirmation line (I, @username, confirm...) and check the test/checklist boxes to fully comply with the template.
✅ Passed checks (3 passed)
Check name Status Explanation
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.
Title check ✅ Passed The pull request title 'feat: run eval, finetune via background jobs' is concise and directly related to the main changes in the changeset. It accurately summarizes the primary objective: introducing background job execution for eval and finetune operations.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch leonard/kil-686-run-eval-via-job

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a background job-based evaluation comparison API that streams aggregate progress via Server-Sent Events (SSE) and handles client disconnects gracefully. It also refactors the keepalive mechanism to prevent subscription teardown on timeouts. The review feedback highlights critical improvements: maintaining strong references to detached cleanup tasks to prevent garbage collection, adding defensive checks for optional progress fields, raising an error when no run configurations are found, and ensuring proper cleanup of partially created jobs if an exception occurs during initialization.

Comment on lines +43 to +56
def _schedule_cleanup_on_disconnect(job_ids: list[str]) -> None:
"""Fire-and-forget cleanup of this request's jobs on client disconnect.

Called from the dying SSE generator's teardown path. The work runs as a
single detached, ordered coroutine so it outlives the request's cancelled
scope and so the ordering below is guaranteed within one task (rather than
racing N independent tasks). Pausing a running job frees a concurrency slot
that `_dispatch_pending` would immediately hand to a still-`pending` job,
promoting it to RUNNING and running it to completion unattended — so we
cancel the pending jobs FIRST (removing them from the queue) and only then
pause the running ones.
"""
task = asyncio.create_task(_cleanup(job_ids))
task.add_done_callback(_log_cleanup_result)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

In Python's asyncio, background tasks created with asyncio.create_task that are not strongly referenced can be garbage collected mid-execution. Since task is only a local variable in _schedule_cleanup_on_disconnect, it is at risk of being garbage collected before _cleanup completes.

To prevent this, maintain a module-level set of active cleanup tasks, add the task to the set when scheduled, and discard it in a done callback.

Suggested change
def _schedule_cleanup_on_disconnect(job_ids: list[str]) -> None:
"""Fire-and-forget cleanup of this request's jobs on client disconnect.
Called from the dying SSE generator's teardown path. The work runs as a
single detached, ordered coroutine so it outlives the request's cancelled
scope and so the ordering below is guaranteed within one task (rather than
racing N independent tasks). Pausing a running job frees a concurrency slot
that `_dispatch_pending` would immediately hand to a still-`pending` job,
promoting it to RUNNING and running it to completion unattendedso we
cancel the pending jobs FIRST (removing them from the queue) and only then
pause the running ones.
"""
task = asyncio.create_task(_cleanup(job_ids))
task.add_done_callback(_log_cleanup_result)
_active_cleanup_tasks: set[asyncio.Task[None]] = set()
def _schedule_cleanup_on_disconnect(job_ids: list[str]) -> None:
"""Fire-and-forget cleanup of this request's jobs on client disconnect.
Called from the dying SSE generator's teardown path. The work runs as a
single detached, ordered coroutine so it outlives the request's cancelled
scope and so the ordering below is guaranteed within one task (rather than
racing N independent tasks). Pausing a running job frees a concurrency slot
that `_dispatch_pending` would immediately hand to a still-`pending` job,
promoting it to RUNNING and running it to completion unattendedso we
cancel the pending jobs FIRST (removing them from the queue) and only then
pause the running ones.
"""
task = asyncio.create_task(_cleanup(job_ids))
_active_cleanup_tasks.add(task)
task.add_done_callback(_active_cleanup_tasks.discard)
task.add_done_callback(_log_cleanup_result)

Comment on lines +32 to +36
def _aggregate_payload(jobs: dict[str, JobRecord]) -> str:
progress = sum(j.progress.success for j in jobs.values())
total = sum((j.progress.total or 0) for j in jobs.values())
errors = sum(j.progress.error for j in jobs.values())
return json.dumps({"progress": progress, "total": total, "errors": errors})
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The progress field on JobRecord is optional and can be None (especially before a job starts or if it is not provided in test fixtures). Accessing j.progress.success directly without a guard will raise an AttributeError.

Add a defensive check to ensure j.progress is not None before accessing its attributes.

Suggested change
def _aggregate_payload(jobs: dict[str, JobRecord]) -> str:
progress = sum(j.progress.success for j in jobs.values())
total = sum((j.progress.total or 0) for j in jobs.values())
errors = sum(j.progress.error for j in jobs.values())
return json.dumps({"progress": progress, "total": total, "errors": errors})
def _aggregate_payload(jobs: dict[str, JobRecord]) -> str:
progress = sum(j.progress.success for j in jobs.values() if j.progress)
total = sum((j.progress.total or 0) for j in jobs.values() if j.progress)
errors = sum(j.progress.error for j in jobs.values() if j.progress)
return json.dumps({"progress": progress, "total": total, "errors": errors})

Comment on lines +245 to +255
if all_run_configs:
run_config_id_list = [
str(rc.id) for rc in get_all_run_configs(project_id, task_id)
]
else:
if len(run_config_ids) == 0:
raise HTTPException(
status_code=400,
detail="No run config ids provided. At least one run config id is required.",
)
run_config_id_list = run_config_ids
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

If all_run_configs is True but no run configurations exist for the task, get_all_run_configs will return an empty list. This results in an empty run_config_id_list, which would attempt to start an SSE stream with zero jobs.

Raise an HTTPException with status code 400 if no run configurations are found, matching the behavior when run_config_ids is empty.

        if all_run_configs:
            run_config_id_list = [
                str(rc.id) for rc in get_all_run_configs(project_id, task_id)
            ]
            if len(run_config_id_list) == 0:
                raise HTTPException(
                    status_code=400,
                    detail="No run configurations found for this task.",
                )
        else:
            if len(run_config_ids) == 0:
                raise HTTPException(
                    status_code=400,
                    detail="No run config ids provided. At least one run config id is required.",
                )
            run_config_id_list = run_config_ids

Comment on lines +257 to +270
job_ids: list[str] = []
for run_config_id in run_config_id_list:
job = await job_registry.create(
"eval",
{
"project_id": project_id,
"task_id": task_id,
"eval_id": str(eval.id),
"eval_config_id": eval_config_id,
"run_config_id": run_config_id,
},
project_id=project_id,
)
job_ids.append(job.id)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

If job_registry.create raises an exception for one of the run configurations in the loop (e.g., due to validation or registry errors), any jobs created in previous iterations will continue running in the background. Since the CancellableStreamingResponse is never returned, the client disconnect cleanup will not be scheduled, leaving these jobs running unattended.

Wrap the job creation loop in a try...except block and clean up any already-created jobs if an exception occurs.

Suggested change
job_ids: list[str] = []
for run_config_id in run_config_id_list:
job = await job_registry.create(
"eval",
{
"project_id": project_id,
"task_id": task_id,
"eval_id": str(eval.id),
"eval_config_id": eval_config_id,
"run_config_id": run_config_id,
},
project_id=project_id,
)
job_ids.append(job.id)
job_ids: list[str] = []
try:
for run_config_id in run_config_id_list:
job = await job_registry.create(
"eval",
{
"project_id": project_id,
"task_id": task_id,
"eval_id": str(eval.id),
"eval_config_id": eval_config_id,
"run_config_id": run_config_id,
},
project_id=project_id,
)
job_ids.append(job.id)
except Exception:
if job_ids:
await _cleanup(job_ids)
raise

@github-actions
Copy link
Copy Markdown

github-actions Bot commented May 29, 2026

📊 Coverage Report

Overall Coverage: 92%

Diff: origin/leonard/kil-686-feat-background-job-system...HEAD

  • app/desktop/desktop_server.py (100%)
  • app/desktop/studio_server/eval_api.py (100%)
  • app/desktop/studio_server/eval_jobs_api.py (90.1%): Missing lines 32,94-95,100-101,112-115,178,219
  • app/desktop/studio_server/finetune_api.py (100%)
  • app/desktop/studio_server/jobs/api.py (87.5%): Missing lines 103
  • app/desktop/studio_server/jobs/events.py (96.2%): Missing lines 75
  • app/desktop/studio_server/jobs/models.py (100%)
  • app/desktop/studio_server/jobs/registry.py (100%)
  • app/desktop/studio_server/jobs/workers/finetune.py (98.0%): Missing lines 107
  • libs/core/kiln_ai/adapters/fine_tune/dataset_formatter.py (100%)

Summary

  • Total: 236 lines
  • Missing: 14 lines
  • Coverage: 94%

Line-by-line

View line-by-line diff coverage

app/desktop/studio_server/eval_jobs_api.py

Lines 28-36

  28 
  29 
  30 def _job_from_event(event: JobEvent) -> JobRecord | None:
  31     if event.event != "job":
! 32         return None
  33     return JobRecord.model_validate(event.data)
  34 
  35 
  36 def _aggregate_payload(jobs: dict[str, JobRecord]) -> str:

Lines 90-105

   90     # A job that changed status mid-cleanup can raise JobOperationError; swallow
   91     # and log it so one stale job never aborts cleanup of the rest.
   92     try:
   93         await coro
!  94     except JobOperationError:
!  95         logger.info(
   96             "Skipping %s for eval job %s on disconnect (status changed)",
   97             label,
   98             job_id,
   99         )
! 100     except Exception:
! 101         logger.warning(
  102             "Failed to %s for eval job %s on client disconnect",
  103             label,
  104             job_id,
  105             exc_info=True,

Lines 108-119

  108 
  109 def _log_cleanup_result(task: asyncio.Task) -> None:
  110     try:
  111         task.result()
! 112     except asyncio.CancelledError:
! 113         pass
! 114     except Exception:
! 115         logger.warning("Eval job disconnect cleanup task failed", exc_info=True)
  116 
  117 
  118 async def _aggregate_progress_stream(
  119     project_id: str, job_ids: list[str]

Lines 174-182

  174                     tracked.pop(deleted_id, None)
  175             else:
  176                 record = _job_from_event(event)
  177                 if record is None or record.id not in id_set:
! 178                     continue
  179                 tracked[record.id] = record
  180 
  181             if len(tracked) < len(id_set):
  182                 continue

Lines 215-223

  215     now visible in the jobs panel."""
  216     eval_config = eval_config_from_id(project_id, task_id, eval_id, eval_config_id)
  217     eval = eval_config.parent_eval()
  218     if eval is None:
! 219         raise HTTPException(status_code=404, detail="Eval config has no parent eval.")
  220 
  221     if all_run_configs:
  222         run_config_id_list = [
  223             str(rc.id) for rc in get_all_run_configs(project_id, task_id)

app/desktop/studio_server/jobs/api.py

Lines 99-107

   99         project_id=project_id,
  100     )
  101     async for item in iter_with_keepalive(subscription, KEEPALIVE_SECONDS):
  102         if isinstance(item, KeepalivePing):
! 103             yield ": ping\n\n"
  104         else:
  105             yield _format_sse(item)
  106 

app/desktop/studio_server/jobs/events.py

Lines 71-79

  71                 # the subscription) is untouched and keeps draining.
  72                 yield KEEPALIVE_PING
  73                 continue
  74             if item is None:
! 75                 break
  76             yield item
  77     finally:
  78         feeder.cancel()
  79         try:

app/desktop/studio_server/jobs/workers/finetune.py

Lines 103-111

  103             raise FinetuneStatusUnknownError(
  104                 f"Provider '{finetune.provider}' is not available for fine-tuning"
  105             ) from exc
  106         if provider_name not in finetune_registry:
! 107             raise FinetuneStatusUnknownError(
  108                 f"Provider '{finetune.provider}' is not available for fine-tuning"
  109             )
  110         adapter_cls = finetune_registry[provider_name]
  111         return await adapter_cls(finetune).status()


Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
app/desktop/studio_server/test_eval_jobs_api.py (1)

233-239: ⚡ Quick win

Make the disconnect tests state-driven instead of sleep-driven.

read_stream() exits on the first data: line, so by the time Line 235 and Line 302 run the task is usually already finished and reader.cancel() is a no-op. That makes both tests depend on timing rather than proving the intended RUNNING/PENDING state before disconnect. Wait on explicit job states, or gate the stub worker with a test event, then close the stream.

Also applies to: 300-306

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@app/desktop/studio_server/test_eval_jobs_api.py` around lines 233 - 239, The
tests use a sleep to wait for read_stream() to start, which makes
reader.cancel() a timing-dependent no-op; change them to be state-driven by
waiting for the job to reach the expected state (e.g., poll the job status until
state == "RUNNING" or "PENDING") or by gating the stub worker with an
asyncio.Event that the test sets when ready, then proceed to close the SSE
stream and cancel reader; update the test code around read_stream(), the task
creation (reader = asyncio.create_task(read_stream())), and the teardown/cancel
logic to await the explicit condition instead of asyncio.sleep so the test
deterministically exercises the disconnect behavior.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@app/desktop/studio_server/eval_jobs_api.py`:
- Around line 245-270: The code may create duplicate eval jobs because
run_config_id_list can contain repeated IDs; before looping and calling
job_registry.create (in the block using run_config_id_list, which is set from
either get_all_run_configs(...) or run_config_ids), deduplicate
run_config_id_list while preserving order (e.g., convert to an ordered-unique
sequence) so each run_config_id is processed once; update references to
run_config_id_list (and the branch that builds it from
get_all_run_configs/project input) to use the deduplicated list before the for
run_config_id in run_config_id_list loop.

---

Nitpick comments:
In `@app/desktop/studio_server/test_eval_jobs_api.py`:
- Around line 233-239: The tests use a sleep to wait for read_stream() to start,
which makes reader.cancel() a timing-dependent no-op; change them to be
state-driven by waiting for the job to reach the expected state (e.g., poll the
job status until state == "RUNNING" or "PENDING") or by gating the stub worker
with an asyncio.Event that the test sets when ready, then proceed to close the
SSE stream and cancel reader; update the test code around read_stream(), the
task creation (reader = asyncio.create_task(read_stream())), and the
teardown/cancel logic to await the explicit condition instead of asyncio.sleep
so the test deterministically exercises the disconnect behavior.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: b8bfdb85-5698-43f4-a660-c3432e6d1678

📥 Commits

Reviewing files that changed from the base of the PR and between fc5f6f6 and 1c2f8d6.

📒 Files selected for processing (10)
  • app/desktop/desktop_server.py
  • app/desktop/studio_server/eval_jobs_api.py
  • app/desktop/studio_server/jobs/api.py
  • app/desktop/studio_server/jobs/events.py
  • app/desktop/studio_server/jobs/test_api.py
  • app/desktop/studio_server/test_eval_jobs_api.py
  • app/web_ui/src/lib/api_schema.d.ts
  • app/web_ui/src/lib/components/run_eval.svelte
  • app/web_ui/src/lib/components/run_eval.test.ts
  • libs/server/kiln_server/utils/agent_checks/annotations/get_api_projects_project_id_tasks_task_id_evals_eval_id_eval_config_eval_config_id_run_comparison_jobs.json

Comment on lines +245 to +270
if all_run_configs:
run_config_id_list = [
str(rc.id) for rc in get_all_run_configs(project_id, task_id)
]
else:
if len(run_config_ids) == 0:
raise HTTPException(
status_code=400,
detail="No run config ids provided. At least one run config id is required.",
)
run_config_id_list = run_config_ids

job_ids: list[str] = []
for run_config_id in run_config_id_list:
job = await job_registry.create(
"eval",
{
"project_id": project_id,
"task_id": task_id,
"eval_id": str(eval.id),
"eval_config_id": eval_config_id,
"run_config_id": run_config_id,
},
project_id=project_id,
)
job_ids.append(job.id)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Deduplicate requested run-config IDs before creating jobs.

Repeated run_config_ids currently schedule repeated eval jobs for the same run config, which breaks the one-job-per-run-config contract and can burn extra compute/credits.

💡 Proposed fix
         if all_run_configs:
             run_config_id_list = [
                 str(rc.id) for rc in get_all_run_configs(project_id, task_id)
             ]
         else:
             if len(run_config_ids) == 0:
                 raise HTTPException(
                     status_code=400,
                     detail="No run config ids provided. At least one run config id is required.",
                 )
             run_config_id_list = run_config_ids
+
+        run_config_id_list = list(dict.fromkeys(run_config_id_list))
 
         job_ids: list[str] = []
         for run_config_id in run_config_id_list:
             job = await job_registry.create(
                 "eval",
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@app/desktop/studio_server/eval_jobs_api.py` around lines 245 - 270, The code
may create duplicate eval jobs because run_config_id_list can contain repeated
IDs; before looping and calling job_registry.create (in the block using
run_config_id_list, which is set from either get_all_run_configs(...) or
run_config_ids), deduplicate run_config_id_list while preserving order (e.g.,
convert to an ordered-unique sequence) so each run_config_id is processed once;
update references to run_config_id_list (and the branch that builds it from
get_all_run_configs/project input) to use the deduplicated list before the for
run_config_id in run_config_id_list loop.

@leonardmq leonardmq marked this pull request as draft May 29, 2026 20:50
leonardmq and others added 19 commits June 1, 2026 17:16
Introduce a discriminated-union convention `metadata.tag` (eval / rag /
finetune / prompt_optimization, room to grow) carrying feature-level
identity. Producers attach the tag at create time; consumers match on
it without knowing the worker's params schema. The job system itself
stays oblivious — metadata remains free-form pass-through.

Also: the Run Eval button on the specs page now reflects status from
the project-scoped jobs SSE — if a matching eval job is pending or
running it shows "Running…" with the live aggregate counter; paused
and terminal jobs are excluded so the button returns to "Run Eval"
and a re-click triggers an idempotent fresh run.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Replace the eval-specific run_eval_awareness helper with generic,
tag-aware store selectors (`filter_by_tag`, `ongoing`, `is_ongoing`,
`aggregate`) that any "Run X" feature can reuse. Encapsulate the
metadata.tag attachment in a per-kind creator (`create_eval_job`)
so producers can never forget it; the dialog and the test bench
now use the creator directly.

Fix the paused-button bug by making the store authoritative for
"is this running?". A `compute_run_state(store_running, initiating,
local)` helper enforces: store-ongoing → "running" (always);
local "complete" / "complete_with_errors" → show terminal display
when nothing's ongoing; otherwise "not_started". An `initiating`
flag bridges the click-to-store-update window so the button
doesn't flicker. Paused → not ongoing → button shows "Run Eval"
and a fresh idempotent click can resume.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The store-derived rejoin counter wasn't pulling its weight: the
in-session SSE already aggregates server-side into the legacy
{progress, total, errors} shape (so per-row Run Eval and Run-All
counters keep working live), and on rejoin the Jobs widget is the
canonical place to see per-job progress. The button just says
"Running…" — that's enough.

Removed aggregate / AggregateProgress and is_ongoing (the latter
was a dupe of `ongoing(...).length > 0`). The dialog now uses local
counters only.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds back_url_for(record) in job_tags.ts — derives the canonical
"back to source" page for a job from its metadata.tag. Renders the
Type cell in the jobs table as a link when one is defined.

The eval tag now carries task_id (required) and spec_id (optional)
so back_url_for can build the compare_run_configs URL. The Run-Eval-
via-job SSE endpoint accepts an optional spec_id query param and
stashes it on each created job's tag; run_eval.svelte propagates it
from $page.params via its callers (run_config_comparison_table,
the compare page). The Jobs-panel dialog doesn't have spec_id and
falls back to the task page.

The shared jobs dialog auto-closes on any client-side navigation
(via beforeNavigate) so clicking a back-link doesn't leave the
modal overlaying the destination page.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
JobRecord now carries a `name` field (adjective-noun like
"swift-falcon", "amber-meadow") generated by the registry at create
time. Not unique — `id` remains the canonical key — just a more
human label than `j_kwyevdvylgaz` for the UI.

Producers populate `metadata.display = {primary, secondary}` with a
per-kind summary at create time; the jobs table renders it verbatim
in a new Details column so the table stays generic across job kinds.
For eval jobs, the run-comparison-jobs endpoint stashes the eval /
judge / run-config names — e.g. "Eval: My Eval" /
"Judge: GPT-4 Judge · Run config: Default".

Jobs table columns are now: Name · Type · Details · Status · Progress
· Message · Created · Actions. The Name cell is a link back to the
job's source page (via back_url_for); the cryptic id column is gone.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…w-up PR)

Lays the groundwork so the future finetune integration is purely
additive (no churn on shared files).

- FinetuneTag now carries task_id (alongside finetune_id) so the
  back URL can be built.
- back_url_for resolves a finetune tag to its detail page:
  /fine_tune/{project}/{task}/fine_tune/{finetune}.
- finetune_tag(...) builder + create_finetune_job(params, display?)
  skeleton — declares the tag shape; the next PR adds the actual
  FinetuneJobWorker (compute_state reads the FineTune entity's
  provider-mirrored status; run() polls the provider until terminal)
  and the endpoint that calls this creator.

RAG is intentionally still placeholder; bigger / weirder, separate PR.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@leonardmq leonardmq changed the title feat: run eval via background jobs (pause on disconnect) feat: run eval, finetune via background jobs Jun 2, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant