Skip to content

feat: background job system (tracked async jobs + jobs UI)#1435

Draft
leonardmq wants to merge 7 commits into
mainfrom
leonard/kil-686-feat-background-job-system
Draft

feat: background job system (tracked async jobs + jobs UI)#1435
leonardmq wants to merge 7 commits into
mainfrom
leonard/kil-686-feat-background-job-system

Conversation

@leonardmq
Copy link
Copy Markdown
Collaborator

Summary

Adds a generic background-job system for the local Kiln app, plus the jobs UI that surfaces it. Jobs run as in-process asyncio tasks that are tracked, controllable (pause/resume/cancel/delete), observable over SSE, and decoupled from any HTTP connection — closing the UI or dropping the stream never stops a job.

Core design principle: a job record is ephemeral, in-memory bookkeeping — never a source of truth. The authoritative state lives in the Kiln project entities a job touches (eval runs, task runs). Workers are idempotent and re-derive their progress via a compute_state() read of those entities, so there is no disk persistence of job state and re-triggering after a crash/restart is safe.

What's included

  • Core layer (app/desktop/studio_server/jobs/): in-memory JobRegistry (semaphore + FIFO, supervising task per job, compute_state reconciliation), BackgroundJobStatus state machine (no interrupted/checkpoints), an in-process event bus, and a best-effort per-run error log in the OS temp dir.
  • REST + SSE (api.py): create / list / get / result / errors / pause / resume / cancel / delete under /api/jobs, plus a pure-observer SSE stream (/api/jobs/events) and a wait capability (GET /api/jobs/{id}/wait and POST /api/jobs/{type}?wait=true) — awaiting observes, never owns, so abandoning the await leaves the job running.
  • Workers: NoopJob (validation/canary) and EvalJob (wraps the existing EvalRunner unchanged; idempotent, so supports_pause = True). Git-sync-aware via a request-free save_context_for_project so background eval writes are committed/pushed for auto-sync projects.
  • Frontend: a global "In progress" sidebar widget (subtle spinner + count) that opens a wide jobs dialog; a shared jobs table reused by the dialog and the /jobs page; a run-eval dialog to start eval jobs from the jobs panel.
  • Specs: full spec set + phase plans under specs/projects/background_job_system/.
  • Agent-check annotations regenerated for the new /api/jobs endpoints.

Notes / follow-ups

  • Wiring the existing specs-page "Run Eval" button through the job system (with pause-on-disconnect) is a separate follow-up branch off this one — not in this PR.
  • Job state is process-scoped by design (in-memory only); recovery = re-trigger (idempotent).

Test plan

  • uv run ./checks.sh --agent-mode passes (ruff, ty, pytest, web lint/check/test/build, schema check)
  • Backend unit/integration tests: registry lifecycle (pause/resume/cancel/delete, reconciliation, semaphore), SSE decoupling, wait (incl. abandon-doesn't-cancel), eval worker idempotency + compute_state, git-sync save_context
  • Frontend tests: jobs store/SSE, indicator states, clear-completed, run-eval dialog, dialog open-signal
  • Manual UI pass (visual): sidebar widget states, dialog layout, live pause/resume/cancel against a running server

🤖 Generated with Claude Code

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 29, 2026

Review Change Stack

Walkthrough

Implements a full background jobs system: server-side models/registry/events/error logs and FastAPI/SSE endpoints; adds noop/eval workers and git-sync helpers; integrates Web UI schema, client, stores, components, and navigation; includes agent approval annotations and extensive tests and specs.

Changes

Background Jobs end-to-end

Layer / File(s) Summary
Core job domain: models, event bus, error log, registry (singleton)
app/desktop/studio_server/jobs/models.py, .../events.py, .../error_log.py, .../registry.py, .../tests/*
Defines job data models and worker contracts, async filtered event bus, per-run JSONL error logs, and in-memory JobRegistry with lifecycle, concurrency, reconciliation, wait, and event emission; includes comprehensive tests.
FastAPI Jobs API, SSE, desktop wiring, and agent annotations
app/desktop/studio_server/jobs/api.py, app/desktop/desktop_server.py, libs/server/kiln_server/server.py, libs/server/kiln_server/utils/agent_checks/annotations/*, app/desktop/studio_server/jobs/test_api.py
Exposes REST endpoints and SSE stream, registers workers, connects into desktop server, adds OpenAPI tag and agent-approval annotations; end-to-end API/SSE tests added.
Workers (noop, eval) and git-sync save context
app/desktop/studio_server/jobs/workers/{noop.py,eval.py}, app/desktop/git_sync/{save_context.py,middleware.py}, app/desktop/studio_server/jobs/workers/test_eval.py, app/desktop/git_sync/test_save_context.py
Implements Noop/Eval workers; eval integrates EvalRunner and per-project save-context; introduces request-free git-sync save-context; adds unit tests.
Web UI schema, client API, stores, and status helpers
app/web_ui/src/lib/api_schema.d.ts, .../stores/{jobs_api.ts,jobs_store.ts,job_status.ts,jobs_dialog.ts}, .../stores/*test.ts
Extends schema typings; adds jobs API client; implements SSE-backed jobs store with derived outputs; adds status/action/formatting helpers and tests.
Web UI components, pages, navigation, and eval-run UI
app/web_ui/src/lib/components/*, app/web_ui/src/routes/(app)/*, app/web_ui/src/lib/ui/{dialog.svelte,icons/jobs_icon.svelte}, .../tests/*
Adds sidebar indicator, jobs dialog/table, jobs page with “test job”, run-eval dialog and workflow helpers, sidebar rail integration, icon, and dialog width option; includes component/page tests.
Specifications and plans for the background job system
specs/projects/background_job_system/*
Adds architecture, functional specs, and phased implementation plans with project overview.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant UI as Web UI
  participant API as FastAPI /api/jobs
  participant Reg as JobRegistry
  participant Bus as JobEventBus
  participant SSE as SSE Stream

  UI->>API: POST /api/jobs/{type} (params, project_id)
  API->>Reg: create(type, params)
  Reg-->>Bus: publish_job (PENDING/RUNNING updates)
  UI-->>SSE: GET /api/jobs/events?project_id=...
  SSE-->>UI: event: snapshot
  Bus-->>SSE: job/deleted events
  SSE-->>UI: event: job/deleted
  UI->>API: POST /api/jobs/{id}/pause|resume|cancel or DELETE /api/jobs/{id}
  API->>Reg: lifecycle op, wait(optional)
  Reg-->>Bus: publish_job (status/progress/result)
Loading

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~180 minutes

Possibly related PRs

  • Kiln-AI/Kiln#1252: Introduces Git Auto Sync manager/registry used by the new git-sync save-context helpers used by the Eval worker.

Suggested reviewers

  • sfierro
  • tawnymanticore

Poem

I thump my paw—jobs hop into queue,
A spinner twirls, then snapshots bloom anew.
From noop naps to evals bold,
The SSE breeze tells stories told.
Click—pause, resume; carrots to pursue—
Results pop in, errors bid adieu.
Hippity-hop, your backlog flew! 🥕✨

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch leonard/kil-686-feat-background-job-system

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a robust background job system to the desktop server, allowing long-running tasks like evaluations to run asynchronously in-process. It adds an in-memory JobRegistry to manage job lifecycles, concurrency, and state reconciliation, alongside a FastAPI router and an SSE stream for real-time UI updates. Additionally, it integrates Svelte components, stores, and dialogs to display and control these jobs in the frontend. The review feedback highlights a critical bug in the SSE stream where using asyncio.wait_for on the generator's __anext__() terminates the stream after the first keepalive ping; the reviewer suggests handling the timeout inside the generator itself. Another recommendation is to defensively wrap worker.compute_state in a try...except block to prevent 500 errors if underlying entities are deleted.

Comment on lines +81 to +97
subscription: AsyncGenerator[JobEvent, None] = job_registry.events.subscribe(
job_id=job_id,
type_name=type_name,
project_id=project_id,
)
try:
while True:
try:
event = await asyncio.wait_for(
subscription.__anext__(), timeout=KEEPALIVE_SECONDS
)
except asyncio.TimeoutError:
yield ": ping\n\n"
continue
except StopAsyncIteration:
break
yield _format_sse(event)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Using asyncio.wait_for on subscription.__anext__() will raise asyncio.TimeoutError when a timeout occurs, but it also cancels the underlying __anext__() coroutine. In Python, cancelling __anext__() on an active async generator throws a CancelledError inside the generator, which executes its finally block and terminates the generator. As a result, the next call to subscription.__anext__() in the loop will immediately raise StopAsyncIteration, causing the event stream to terminate after the very first keepalive ping (15 seconds of inactivity).

To fix this, handle the timeout inside the subscribe generator itself by passing a timeout parameter to it, and yielding a special "ping" event from the generator when a timeout occurs.

    subscription = job_registry.events.subscribe(
        job_id=job_id,
        type_name=type_name,
        project_id=project_id,
        timeout=KEEPALIVE_SECONDS,
    )
    try:
        async for event in subscription:
            if event.event == "ping":
                yield ": ping\n\n"
            else:
                yield _format_sse(event)

Comment on lines +14 to +15
event: Literal["snapshot", "job", "deleted"]
data: dict[str, Any]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Add "ping" to the JobEvent event type literal to support keepalive ping events generated on timeout.

Suggested change
event: Literal["snapshot", "job", "deleted"]
data: dict[str, Any]
event: Literal["snapshot", "job", "deleted", "ping"]
data: dict[str, Any]

Comment on lines +71 to +86
async def subscribe(
self,
job_id: str | None = None,
type_name: str | None = None,
project_id: str | None = None,
) -> AsyncGenerator[JobEvent, None]:
subscriber = _Subscriber(job_id, type_name, project_id)
self._subscribers.add(subscriber)
try:
snapshot = self._filtered_snapshot(subscriber)
yield JobEvent(
event="snapshot",
data={"jobs": [r.model_dump(mode="json") for r in snapshot]},
)
while True:
yield await subscriber.queue.get()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Update the subscribe method to accept an optional timeout parameter. When a timeout is specified, use asyncio.wait_for on subscriber.queue.get() inside the loop, and yield a "ping" event if a timeout occurs. This avoids cancelling the generator's __anext__() from the outside, which would otherwise terminate the generator prematurely.

    async def subscribe(
        self,
        job_id: str | None = None,
        type_name: str | None = None,
        project_id: str | None = None,
        timeout: float | None = None,
    ) -> AsyncGenerator[JobEvent, None]:
        subscriber = _Subscriber(job_id, type_name, project_id)
        self._subscribers.add(subscriber)
        try:
            snapshot = self._filtered_snapshot(subscriber)
            yield JobEvent(
                event="snapshot",
                data={"jobs": [r.model_dump(mode="json") for r in snapshot]},
            )
            while True:
                if timeout is not None:
                    try:
                        yield await asyncio.wait_for(subscriber.queue.get(), timeout=timeout)
                    except asyncio.TimeoutError:
                        yield JobEvent(event="ping", data={})
                else:
                    yield await subscriber.queue.get()

Comment on lines +444 to +447
params = worker.params_model.model_validate(job.params)
derived = await worker.compute_state(params)
if derived is None:
return False
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Wrap the call to worker.compute_state(params) in a try...except block. If an underlying entity (such as a project or task) is deleted or temporarily unavailable, compute_state may raise an exception. Propagating this exception will cause the entire GET /api/jobs/{id} request to fail with a 500 error, preventing the user from viewing or deleting the job. Handling the exception defensively ensures the API remains robust and returns the last known in-memory state.

Suggested change
params = worker.params_model.model_validate(job.params)
derived = await worker.compute_state(params)
if derived is None:
return False
params = worker.params_model.model_validate(job.params)
try:
derived = await worker.compute_state(params)
except Exception:
logger.exception("Failed to compute state for job %s", job.id)
return False
if derived is None:
return False

@github-actions
Copy link
Copy Markdown

📊 Coverage Report

Overall Coverage: 92%

Diff: origin/main...HEAD

  • app/desktop/desktop_server.py (100%)
  • app/desktop/git_sync/save_context.py (100%)
  • app/desktop/studio_server/jobs/api.py (100%)
  • app/desktop/studio_server/jobs/error_log.py (90.9%): Missing lines 57-58,66-67
  • app/desktop/studio_server/jobs/events.py (94.2%): Missing lines 39,60,64
  • app/desktop/studio_server/jobs/models.py (98.7%): Missing lines 180
  • app/desktop/studio_server/jobs/registry.py (93.8%): Missing lines 58-63,186,195,205,223,256,342-347,411,415,443
  • app/desktop/studio_server/jobs/workers/eval.py (96.6%): Missing lines 132,135
  • app/desktop/studio_server/jobs/workers/noop.py (100%)

Summary

  • Total: 734 lines
  • Missing: 30 lines
  • Coverage: 95%

Line-by-line

View line-by-line diff coverage

app/desktop/studio_server/jobs/error_log.py

Lines 53-62

  53                 except (ValueError, TypeError):
  54                     continue
  55                 if isinstance(parsed, dict):
  56                     entries.append(parsed)
! 57     except Exception:
! 58         return entries
  59     return entries
  60 
  61 
  62 def delete_errors(run_id: str) -> None:

Lines 62-68

  62 def delete_errors(run_id: str) -> None:
  63     """Best-effort remove the error log file for a run. Swallows all errors."""
  64     try:
  65         error_log_path(run_id).unlink(missing_ok=True)
! 66     except Exception:
! 67         pass

app/desktop/studio_server/jobs/events.py

Lines 35-43

  35     ) -> bool:
  36         if self.job_id is not None and self.job_id != record_id:
  37             return False
  38         if self.type_name is not None and self.type_name != record_type:
! 39             return False
  40         if self.project_id is not None and self.project_id != record_project_id:
  41             return False
  42         return True

Lines 56-68

  56         self._subscribers: set[_Subscriber] = set()
  57         self._snapshot_provider = snapshot_provider
  58 
  59     def set_snapshot_provider(self, provider: SnapshotProvider) -> None:
! 60         self._snapshot_provider = provider
  61 
  62     def _filtered_snapshot(self, subscriber: _Subscriber) -> list[JobRecord]:
  63         if self._snapshot_provider is None:
! 64             return []
  65         return [
  66             record
  67             for record in self._snapshot_provider()
  68             if subscriber.matches(record.id, record.type, record.project_id)

app/desktop/studio_server/jobs/models.py

Lines 176-181

  176         """MUST be idempotent. Covers both first run and resume — the registry
  177         calls run() again to resume a paused job; the worker re-orients via
  178         compute_state(), not a handed-in checkpoint.
  179         """
! 180         raise NotImplementedError

app/desktop/studio_server/jobs/registry.py

Lines 54-67

  54     if explicit is not None:
  55         return explicit
  56     raw = os.environ.get(MAX_CONCURRENT_ENV_VAR)
  57     if raw:
! 58         try:
! 59             value = int(raw)
! 60             if value > 0:
! 61                 return value
! 62         except ValueError:
! 63             pass
  64     return DEFAULT_MAX_CONCURRENT
  65 
  66 
  67 class JobRegistry:

Lines 182-190

  182 
  183     def _fresh_job_id(self) -> str:
  184         job_id = _new_job_id()
  185         while job_id in self._jobs:
! 186             job_id = _new_job_id()
  187         return job_id
  188 
  189     def _validate_params(
  190         self, worker: JobWorker, params: dict[str, Any] | BaseModel

Lines 191-199

  191     ) -> BaseModel:
  192         if isinstance(params, worker.params_model):
  193             return params
  194         if isinstance(params, BaseModel):
! 195             params = params.model_dump()
  196         return worker.params_model.model_validate(params)
  197 
  198     # -- dispatch / supervision ---------------------------------------------

Lines 201-209

  201         while self._running_count < self._max_concurrent and self._pending_ids:
  202             job_id = self._pending_ids.pop(0)
  203             job = self._jobs.get(job_id)
  204             if job is None or job.status != BackgroundJobStatus.PENDING:
! 205                 continue
  206             self._launch(job)
  207 
  208     def _launch(self, job: JobRecord) -> None:
  209         worker = self.worker_for(job.type)

Lines 219-227

  219 
  220     async def _supervise(self, job_id: str, worker: JobWorker, run_id: str) -> None:
  221         job = self._jobs.get(job_id)
  222         if job is None:
! 223             return
  224         params = worker.params_model.model_validate(job.params)
  225         ctx = self._build_context(job_id, run_id)
  226         try:
  227             try:

Lines 252-260

  252     def _build_context(self, job_id: str, run_id: str) -> JobContext:
  253         async def report_progress(update: JobProgressUpdate) -> None:
  254             job = self._jobs.get(job_id)
  255             if job is None or job.run_id != run_id:
! 256                 return
  257             job.progress = JobProgress(
  258                 total=update.total if update.total is not None else job.progress.total,
  259                 success=update.success,
  260                 error=update.error,

Lines 338-351

  338         worker = self.worker_for(job.type)
  339         params = worker.params_model.model_validate(job.params)
  340         derived = await worker.compute_state(params)
  341         if derived is not None and derived.is_complete:
! 342             self._apply_derived(job, derived)
! 343             job.status = BackgroundJobStatus.SUCCEEDED
! 344             job.ended_at = _utc_now()
! 345             self._touch(job)
! 346             self._emit(job)
! 347             return job
  348         self._apply_derived(job, derived)
  349         job.status = BackgroundJobStatus.PENDING
  350         self._touch(job)
  351         self._pending_ids.append(job_id)

Lines 407-419

  407         try:
  408             await task
  409         except asyncio.CancelledError:
  410             pass
! 411         except Exception:
  412             # The worker raised while we awaited its cancellation. _supervise
  413             # already routed this to the failed/terminal state and logged it;
  414             # we only debug-log here so it isn't silently discarded.
! 415             logger.debug(
  416                 "Worker for job %s raised during cancel await", job_id, exc_info=True
  417             )
  418         # If the task was cancelled before its coroutine body ever ran, its own
  419         # finally never executed, so reclaim the slot here. Idempotent: whoever

Lines 439-447

  439 
  440     async def _reconcile(self, job: JobRecord, emit_on_change: bool) -> bool:
  441         worker = self._workers.get(job.type)
  442         if worker is None:
! 443             return False
  444         params = worker.params_model.model_validate(job.params)
  445         derived = await worker.compute_state(params)
  446         if derived is None:
  447             return False

app/desktop/studio_server/jobs/workers/eval.py

Lines 128-137

  128 
  129     def _eval_and_task(self, eval_config: EvalConfig) -> tuple[Eval, Task]:
  130         eval = eval_config.parent_eval()
  131         if eval is None:
! 132             raise ValueError("Eval config has no parent eval")
  133         task = eval.parent_task()
  134         if task is None:
! 135             raise ValueError("Eval has no parent task")
  136         return eval, task


Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (5)
app/desktop/studio_server/jobs/api.py (3)

138-160: ⚡ Quick win

Query parameter name type shadows builtin and inconsistent with internal naming.

Line 142 uses type as the query parameter name but the registry method at line 156 expects type_name. While FastAPI handles the mapping, this creates unnecessary cognitive load.

Use type_name consistently
     async def list_jobs(
         status: Annotated[
             BackgroundJobStatus | None, Query(description="Filter by job status.")
         ] = None,
-        type: Annotated[str | None, Query(description="Filter by job type.")] = None,
+        type_name: Annotated[str | None, Query(description="Filter by job type.")] = None,
         project_id: Annotated[
             str | None, Query(description="Filter by project id.")
         ] = None,
         since: Annotated[
             datetime | None,
             Query(description="Only jobs created at or after this ISO-8601 time."),
         ] = None,
         limit: Annotated[
             int | None, Query(description="Maximum number of jobs to return.")
         ] = None,
     ) -> list[JobRecord]:
         return job_registry.list_jobs(
             status=status,
-            type_name=type,
+            type_name=type_name,
             project_id=project_id,
             since=since,
             limit=limit,
         )
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@app/desktop/studio_server/jobs/api.py` around lines 138 - 160, The list_jobs
endpoint uses a query parameter named type which shadows a builtin and
mismatches the registry API; rename the parameter in the list_jobs function
signature to type_name (update its Annotated/Query description accordingly) and
pass type_name into job_registry.list_jobs so the parameter name matches the
internal API (keep other parameters and behavior unchanged).

114-130: 💤 Low value

SSE endpoint param name type shadows Python builtin.

Line 118 names the query parameter type, which shadows the Python builtin type. While FastAPI function parameters are local scope and this is safe, it reduces readability and could confuse linters or IDEs.

Rename to type_name for consistency
     async def stream_job_events(
         job_id: Annotated[
             str | None, Query(description="Only stream events for this job id.")
         ] = None,
-        type: Annotated[
+        type_name: Annotated[
             str | None, Query(description="Only stream events for this job type.")
         ] = None,
         project_id: Annotated[
             str | None, Query(description="Only stream events for this project id.")
         ] = None,
     ) -> CancellableStreamingResponse:
         """Server-sent events for jobs. Emits an initial `snapshot`, then per-job
         `job` and `deleted` events. A pure observer: disconnecting never stops a job."""
         return CancellableStreamingResponse(
-            content=_event_stream(job_id, type, project_id),
+            content=_event_stream(job_id, type_name, project_id),
             media_type="text/event-stream",
         )
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@app/desktop/studio_server/jobs/api.py` around lines 114 - 130, Rename the
query parameter named type in the stream_job_events function to type_name (or
similar) to avoid shadowing the built-in type; update the function signature
parameter and all usages passed into _event_stream (currently
content=_event_stream(job_id, type, project_id)) to use type_name so the SSE
filter behavior remains unchanged while improving readability and avoiding
linter/IDE confusion.

170-213: ⚡ Quick win

Path parameter type shadows builtin throughout create_job endpoint.

Similar issue as other endpoints - using type as parameter name shadows the builtin and creates inconsistency with registry method calls.

Rename type to type_name in path and logic
     async def create_job(
-        type: Annotated[str, Path(description="The registered job type to run.")],
+        type_name: Annotated[str, Path(description="The registered job type to run.")],
         request: CreateJobRequest,
         wait: Annotated[
             bool,
             Query(
                 description="When true, block until the job reaches a terminal "
                 "state and return the full JobRecord instead of CreateJobResponse."
             ),
         ] = False,
         timeout: Annotated[
             float | None,
             Query(
                 ge=0,
                 description="Seconds to wait when wait=true (504 on timeout). "
                 "Omit to wait indefinitely.",
             ),
         ] = None,
     ) -> CreateJobResponse | JobRecord:
         try:
-            worker = job_registry.worker_for(type)
+            worker = job_registry.worker_for(type_name)
         except JobOperationError:
-            raise HTTPException(status_code=404, detail=f"Unknown job type: {type}")
+            raise HTTPException(status_code=404, detail=f"Unknown job type: {type_name}")

         try:
             validated = worker.params_model.model_validate(request.params)
         except ValidationError as exc:
             raise HTTPException(status_code=422, detail=exc.errors())

         job = await job_registry.create(
-            type_name=type,
+            type_name=type_name,
             params=validated,
             project_id=request.project_id or _project_id_from_params(validated),
             metadata=request.metadata,
         )
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@app/desktop/studio_server/jobs/api.py` around lines 170 - 213, The path
parameter name shadows Python builtin: rename the parameter in create_job from
type to type_name (update the Path annotation) and then use type_name everywhere
in the function (worker = job_registry.worker_for(type_name),
job_registry.create(type_name=type_name, ...), error messages, and any other
references), and adjust the 404 HTTPException detail to reference type_name;
ensure wait/timeout logic and variable names remain unchanged aside from this
renaming.
app/desktop/studio_server/jobs/registry.py (1)

257-264: 💤 Low value

Preserve total and message defensively in progress updates.

Lines 258 and 261-263 preserve existing total and message when the update provides None, but only using a ternary check. This works correctly per the functional spec, but it's fragile if a worker accidentally passes empty string or 0 for message/total.

Alternative: explicit None check
 job.progress = JobProgress(
-    total=update.total if update.total is not None else job.progress.total,
+    total=update.total if update.total is not None else job.progress.total,
     success=update.success,
     error=update.error,
-    message=update.message
-    if update.message is not None
-    else job.progress.message,
+    message=update.message if update.message is not None else job.progress.message,
 )
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@app/desktop/studio_server/jobs/registry.py` around lines 257 - 264, The
progress update logic in registry.JobProgress assignment uses truthy ternaries
which will incorrectly overwrite valid falsy values (0 or empty string); change
the checks for update.total and update.message to explicit None checks (e.g.,
use "if update.total is not None else job.progress.total" and "if update.message
is not None else job.progress.message") when constructing the new JobProgress so
that valid falsy values from update.total and update.message are preserved;
update the JobProgress creation in the code that sets job.progress accordingly.
app/desktop/studio_server/jobs/test_registry.py (1)

130-157: ⚡ Quick win

Test worker uses mutable class attributes that may leak between tests.

SwallowCancelWorker (and similarly TotalThenNoneWorker, RaceCompleteWorker, ReconcileCompleteWorker) store started and gate as class attributes. If a test fails to set these or if tests run in unexpected order, stale Event objects could cause confusing failures.

Consider resetting these in a fixture or making them instance attributes:

Reset in autouse fixture
+@pytest.fixture(autouse=True)
+def reset_worker_gates():
+    """Reset class-level Event gates before each test."""
+    for worker_cls in [SwallowCancelWorker, TotalThenNoneWorker, RaceCompleteWorker]:
+        if hasattr(worker_cls, 'gate'):
+            worker_cls.gate = asyncio.Event()
+        if hasattr(worker_cls, 'started'):
+            worker_cls.started = asyncio.Event()
+    ReconcileCompleteWorker.done = False
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@app/desktop/studio_server/jobs/test_registry.py` around lines 130 - 157,
Tests create shared asyncio.Event objects as class attributes (started and gate)
on SwallowCancelWorker (and TotalThenNoneWorker, RaceCompleteWorker,
ReconcileCompleteWorker), which can leak state between tests; make these
per-instance instead by initializing them in the worker constructor or at start
of run (e.g., set self.started = asyncio.Event() and self.gate = asyncio.Event()
before awaiting) or alternatively add an autouse test fixture that resets those
class attributes to fresh asyncio.Event() objects before each test to ensure no
stale events leak across tests.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@app/desktop/git_sync/save_context.py`:
- Around line 59-66: The current call to BackgroundSync.notify_request() once is
insufficient to prevent the poller from treating a long-running
GitSyncManager.atomic_write() as idle (because BackgroundSync uses
_last_request_time), so ensure the poller is truly paused for the duration of
the atomic write: locate where GitSyncRegistry.get_background_sync(...) returns
bg_sync and instead of a single notify_request(), call a proper pause API on
BackgroundSync (e.g., bg_sync.pause()) before invoking manager.atomic_write()
and bg_sync.resume() after the factory completes (or, if no pause API exists,
add one that sets a paused flag checked by the poll loop or maintain a scoped
counter/lock in BackgroundSync that prevents remote->local polling while
atomic_write runs); ensure you reference BackgroundSync, notify_request,
_last_request_time, GitSyncManager.atomic_write, and
GitSyncRegistry.get_background_sync when making the change.

In `@app/desktop/studio_server/jobs/registry.py`:
- Around line 405-407: The code should not treat task.cancel()'s True as proof
the task was delivered cancelled; instead call task.cancel() to request
cancellation, then await the task and only add job_id to self._cancel_delivered
when you observe the cancellation actually occurred (e.g., in the except
asyncio.CancelledError: handler or after await if task.cancelled() is True).
Update the block around task.cancel(), the subsequent await/try/except that
awaits the asyncio.Task (the same Task object referenced as task), and move the
self._cancel_delivered.add(job_id) into the CancelledError handling (or an
explicit post-await check) so delivery is recorded only upon confirmed
cancellation.

In `@app/desktop/studio_server/jobs/workers/eval.py`:
- Around line 42-104: compute_state currently performs blocking filesystem/model
loads via task.runs(readonly=True) and eval_config.runs(readonly=True) which can
stall the event loop; move the materialization and set-building (the calls that
iterate and load runs and build in_filter_ids and scored_ids) into a
thread/executor (e.g. asyncio.to_thread or run_in_executor) and return only the
computed primitive totals/sets to the async function so compute_state does only
lightweight set math and returns JobDerivedState; specifically wrap the logic
that calls task.runs and eval_config.runs and constructs in_filter_ids and
scored_ids into a synchronous helper (or inline closure) executed via
asyncio.to_thread, then use its result to compute success/total/is_complete in
compute_state.

In `@app/web_ui/src/routes/`(app)/+layout.svelte:
- Around line 431-444: Remove the redundant aria-label on the button so the
computed accessible name includes the visible "In progress" text plus the nested
SidebarJobsIndicator count; locate the button element that calls
jobs_dialog.open() (the one wrapping JobsIcon and SidebarJobsIndicator) and
delete the aria-label="Background jobs" attribute, ensuring SidebarJobsIndicator
still provides its own aria-label for the active count if needed.

---

Nitpick comments:
In `@app/desktop/studio_server/jobs/api.py`:
- Around line 138-160: The list_jobs endpoint uses a query parameter named type
which shadows a builtin and mismatches the registry API; rename the parameter in
the list_jobs function signature to type_name (update its Annotated/Query
description accordingly) and pass type_name into job_registry.list_jobs so the
parameter name matches the internal API (keep other parameters and behavior
unchanged).
- Around line 114-130: Rename the query parameter named type in the
stream_job_events function to type_name (or similar) to avoid shadowing the
built-in type; update the function signature parameter and all usages passed
into _event_stream (currently content=_event_stream(job_id, type, project_id))
to use type_name so the SSE filter behavior remains unchanged while improving
readability and avoiding linter/IDE confusion.
- Around line 170-213: The path parameter name shadows Python builtin: rename
the parameter in create_job from type to type_name (update the Path annotation)
and then use type_name everywhere in the function (worker =
job_registry.worker_for(type_name), job_registry.create(type_name=type_name,
...), error messages, and any other references), and adjust the 404
HTTPException detail to reference type_name; ensure wait/timeout logic and
variable names remain unchanged aside from this renaming.

In `@app/desktop/studio_server/jobs/registry.py`:
- Around line 257-264: The progress update logic in registry.JobProgress
assignment uses truthy ternaries which will incorrectly overwrite valid falsy
values (0 or empty string); change the checks for update.total and
update.message to explicit None checks (e.g., use "if update.total is not None
else job.progress.total" and "if update.message is not None else
job.progress.message") when constructing the new JobProgress so that valid falsy
values from update.total and update.message are preserved; update the
JobProgress creation in the code that sets job.progress accordingly.

In `@app/desktop/studio_server/jobs/test_registry.py`:
- Around line 130-157: Tests create shared asyncio.Event objects as class
attributes (started and gate) on SwallowCancelWorker (and TotalThenNoneWorker,
RaceCompleteWorker, ReconcileCompleteWorker), which can leak state between
tests; make these per-instance instead by initializing them in the worker
constructor or at start of run (e.g., set self.started = asyncio.Event() and
self.gate = asyncio.Event() before awaiting) or alternatively add an autouse
test fixture that resets those class attributes to fresh asyncio.Event() objects
before each test to ensure no stale events leak across tests.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: ba341ed9-0074-4960-8133-31fb77aea3e0

📥 Commits

Reviewing files that changed from the base of the PR and between 2dee24e and fc5f6f6.

📒 Files selected for processing (64)
  • app/desktop/desktop_server.py
  • app/desktop/git_sync/middleware.py
  • app/desktop/git_sync/save_context.py
  • app/desktop/git_sync/test_save_context.py
  • app/desktop/studio_server/jobs/__init__.py
  • app/desktop/studio_server/jobs/api.py
  • app/desktop/studio_server/jobs/error_log.py
  • app/desktop/studio_server/jobs/events.py
  • app/desktop/studio_server/jobs/models.py
  • app/desktop/studio_server/jobs/registry.py
  • app/desktop/studio_server/jobs/test_api.py
  • app/desktop/studio_server/jobs/test_error_log.py
  • app/desktop/studio_server/jobs/test_events.py
  • app/desktop/studio_server/jobs/test_registry.py
  • app/desktop/studio_server/jobs/workers/__init__.py
  • app/desktop/studio_server/jobs/workers/eval.py
  • app/desktop/studio_server/jobs/workers/noop.py
  • app/desktop/studio_server/jobs/workers/test_eval.py
  • app/web_ui/src/lib/api_schema.d.ts
  • app/web_ui/src/lib/components/SidebarJobsIndicator.svelte
  • app/web_ui/src/lib/components/SidebarJobsIndicator.test.ts
  • app/web_ui/src/lib/components/jobs_dialog.component.test.ts
  • app/web_ui/src/lib/components/jobs_dialog.svelte
  • app/web_ui/src/lib/components/jobs_table.svelte
  • app/web_ui/src/lib/components/jobs_table.test.ts
  • app/web_ui/src/lib/stores/job_status.test.ts
  • app/web_ui/src/lib/stores/job_status.ts
  • app/web_ui/src/lib/stores/jobs_api.test.ts
  • app/web_ui/src/lib/stores/jobs_api.ts
  • app/web_ui/src/lib/stores/jobs_dialog.test.ts
  • app/web_ui/src/lib/stores/jobs_dialog.ts
  • app/web_ui/src/lib/stores/jobs_store.test.ts
  • app/web_ui/src/lib/stores/jobs_store.ts
  • app/web_ui/src/lib/ui/dialog.svelte
  • app/web_ui/src/lib/ui/icons/jobs_icon.svelte
  • app/web_ui/src/routes/(app)/+layout.svelte
  • app/web_ui/src/routes/(app)/jobs/+page.svelte
  • app/web_ui/src/routes/(app)/jobs/run_eval_dialog.svelte
  • app/web_ui/src/routes/(app)/jobs/run_eval_dialog.test.ts
  • app/web_ui/src/routes/(app)/jobs/run_eval_job.test.ts
  • app/web_ui/src/routes/(app)/jobs/run_eval_job.ts
  • app/web_ui/src/routes/(app)/sidebar_rail.svelte
  • app/web_ui/src/routes/(app)/sidebar_rail_item.svelte
  • app/web_ui/src/routes/(app)/sidebar_rail_item.test.ts
  • libs/server/kiln_server/server.py
  • libs/server/kiln_server/utils/agent_checks/annotations/delete_api_jobs_id.json
  • libs/server/kiln_server/utils/agent_checks/annotations/get_api_jobs.json
  • libs/server/kiln_server/utils/agent_checks/annotations/get_api_jobs_events.json
  • libs/server/kiln_server/utils/agent_checks/annotations/get_api_jobs_id.json
  • libs/server/kiln_server/utils/agent_checks/annotations/get_api_jobs_id_errors.json
  • libs/server/kiln_server/utils/agent_checks/annotations/get_api_jobs_id_result.json
  • libs/server/kiln_server/utils/agent_checks/annotations/get_api_jobs_id_wait.json
  • libs/server/kiln_server/utils/agent_checks/annotations/post_api_jobs_id_cancel.json
  • libs/server/kiln_server/utils/agent_checks/annotations/post_api_jobs_id_pause.json
  • libs/server/kiln_server/utils/agent_checks/annotations/post_api_jobs_id_resume.json
  • libs/server/kiln_server/utils/agent_checks/annotations/post_api_jobs_type.json
  • specs/projects/background_job_system/architecture.md
  • specs/projects/background_job_system/functional_spec.md
  • specs/projects/background_job_system/implementation_plan.md
  • specs/projects/background_job_system/phase_plans/phase_1.md
  • specs/projects/background_job_system/phase_plans/phase_2.md
  • specs/projects/background_job_system/phase_plans/phase_3.md
  • specs/projects/background_job_system/phase_plans/phase_4.md
  • specs/projects/background_job_system/project_overview.md

Comment on lines +59 to +66
bg_sync = GitSyncRegistry.get_background_sync(manager.repo_path)
if bg_sync is not None:
bg_sync.notify_request()

def factory():
return manager.atomic_write(context=context)

return factory
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Idle-pause behavior of background sync + notify_request semantics
fd -t f -p 'git_sync' --exec rg -nP -C3 'def notify_request|idle|pause' 
# Does atomic_write itself notify or trigger push?
ast-grep --pattern 'async def atomic_write($$$):
  $$$'

Repository: Kiln-AI/Kiln

Length of output: 8948


Adjust concern: background sync pausing won’t delay atomic_write commits/pushes.

BackgroundSync pauses based on _last_request_time, which is reset only by notify_request(), so calling notify_request() only once can let the poll loop go idle during long runs. However, GitSyncManager.atomic_write() itself acquires the write lock, ensures the repo is clean/fresh, and then commits and pushes on successful block exit—so pausing the background poller shouldn’t postpone the job’s own commit/push actions. The real risk is remote->local polling (freshness/remote changes) during the long-running writer, not write output being held until the job ends.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@app/desktop/git_sync/save_context.py` around lines 59 - 66, The current call
to BackgroundSync.notify_request() once is insufficient to prevent the poller
from treating a long-running GitSyncManager.atomic_write() as idle (because
BackgroundSync uses _last_request_time), so ensure the poller is truly paused
for the duration of the atomic write: locate where
GitSyncRegistry.get_background_sync(...) returns bg_sync and instead of a single
notify_request(), call a proper pause API on BackgroundSync (e.g.,
bg_sync.pause()) before invoking manager.atomic_write() and bg_sync.resume()
after the factory completes (or, if no pause API exists, add one that sets a
paused flag checked by the poll loop or maintain a scoped counter/lock in
BackgroundSync that prevents remote->local polling while atomic_write runs);
ensure you reference BackgroundSync, notify_request, _last_request_time,
GitSyncManager.atomic_write, and GitSyncRegistry.get_background_sync when making
the change.

Comment on lines +405 to +407
if task.cancel():
self._cancel_delivered.add(job_id)
try:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🌐 Web query:

Does asyncio.Task.cancel() guarantee that returning True means CancelledError will be raised in the task?

💡 Result:

No, asyncio.Task.cancel does not guarantee that a CancelledError will be raised in the task, nor does it guarantee that the task will be cancelled [1][2]. When you call task.cancel, it merely schedules a CancelledError to be thrown into the wrapped coroutine at the next opportunity when the event loop runs [1]. The coroutine has the ability to catch, handle, or suppress this exception using try/except or try/finally blocks [1][3]. If the coroutine suppresses the exception and does not re-raise it or call Task.uncancel, the task will not be marked as cancelled [1][3]. The return value of True from cancel simply indicates that the cancellation request was successfully scheduled; it does not mean the task is currently cancelled or will eventually be cancelled [1][2]. A task is only considered truly cancelled if the wrapped coroutine terminates with a CancelledError [1][2]. Therefore, relying on Task.cancel as an absolute guarantee of task termination is incorrect, as the task retains control over whether or not it honors the cancellation request [1].

Citations:


task.cancel() return value isn’t a reliable signal for “delivered” cancellation

The code uses task.cancel() to decide whether to record cancellation delivery, but asyncio.Task.cancel()’s True only means the cancellation request was successfully scheduled/requested—it does not guarantee the task will actually raise/propagate CancelledError (the coroutine can catch/suppress it or otherwise ignore cancellation). That makes _cancel_delivered.add(job_id) vulnerable to races around the subsequent await task/exception handling.

if task.cancel():
    self._cancel_delivered.add(job_id)
try:
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@app/desktop/studio_server/jobs/registry.py` around lines 405 - 407, The code
should not treat task.cancel()'s True as proof the task was delivered cancelled;
instead call task.cancel() to request cancellation, then await the task and only
add job_id to self._cancel_delivered when you observe the cancellation actually
occurred (e.g., in the except asyncio.CancelledError: handler or after await if
task.cancelled() is True). Update the block around task.cancel(), the subsequent
await/try/except that awaits the asyncio.Task (the same Task object referenced
as task), and move the self._cancel_delivered.add(job_id) into the
CancelledError handling (or an explicit post-await check) so delivery is
recorded only upon confirmed cancellation.

Comment on lines +42 to +104
async def compute_state(self, params: EvalJobParams) -> JobDerivedState:
eval_config = eval_config_from_id(
params.project_id,
params.task_id,
params.eval_id,
params.eval_config_id,
)
eval, task = self._eval_and_task(eval_config)

# The eval-set filter defines the universe of dataset items in scope.
# EvalRunner only works items that BOTH pass this filter AND lack a
# matching EvalRun, so progress must be measured against this set.
filter = dataset_filter_from_id(eval.eval_set_filter_id)
in_filter_ids = {
task_run.id for task_run in task.runs(readonly=True) if filter(task_run)
}
total = len(in_filter_ids)

# Count only scored items that are still in the filter set. Items that
# were scored but later drifted out of the filter must not be counted,
# or success/is_complete would overcount and a resume could short-circuit
# to succeeded while real work remains.
scored_ids = {
run.dataset_id
for run in eval_config.runs(readonly=True)
if run.task_run_config_id == params.run_config_id
}
success = len(scored_ids & in_filter_ids)

return JobDerivedState(
total=total,
success=success,
error=0,
is_complete=success >= total,
)

async def run(self, params: EvalJobParams, ctx: JobContext) -> EvalJobResult:
# Baseline: items already scored (and still in-filter) before this run.
# EvalRunner only works the unfinished remainder, so its Progress counts
# are relative to that remainder. We add the baseline back so progress
# and the returned result are reported against the FULL eval-set size,
# not just the work left for this run.
baseline = await self.compute_state(params)
baseline_success = baseline.success

eval_runner = self._build_eval_runner(params)

success = baseline_success
total = baseline.total if baseline.total is not None else baseline_success
error = 0
async for progress in eval_runner.run():
# progress.total = full - baseline_success (the unfinished remainder),
# so baseline_success + progress.total = the full eval-set size.
success = baseline_success + progress.complete
total = baseline_success + progress.total
error = progress.errors
await ctx.report_progress(
success=success,
error=error,
total=total,
)

return EvalJobResult(total=total, success=success, error=error)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚖️ Poor tradeoff

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# How runs(readonly=True) is implemented (sync disk read?)
ast-grep --pattern 'def runs($$$):
  $$$'
rg -nP --type=py -C3 '\bdef runs\s*\(' 

# Confirm registry schedules workers on the running loop (not a thread)
fd -t f registry.py -p 'jobs' --exec rg -nP -C3 'create_task|to_thread|run_in_executor|ensure_future'

Repository: Kiln-AI/Kiln

Length of output: 1221


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# 1) Inspect Task.runs(readonly=...) implementation body
FILE="libs/core/kiln_ai/datamodel/task.py"
echo "== $FILE: runs(...) implementation =="
rg -n "def runs\(" "$FILE"
start=$(rg -n "def runs\(" "$FILE" | head -n1 | cut -d: -f1)
# print ~120 lines around the runs implementation
sed -n "$((start-5)),$((start+140))p" "$FILE" | cat -n

# 2) Check if Eval.runs just delegates to Task.runs (and therefore same IO)
FILE_E="libs/core/kiln_ai/datamodel/eval.py"
echo "== $FILE_E: runs(...) =="
rg -n "def runs\(" "$FILE_E"
startE=$(rg -n "def runs\(" "$FILE_E" | head -n1 | cut -d: -f1)
sed -n "$((startE-5)),$((startE+80))p" "$FILE_E" | cat -n

# 3) Find where "registry.py" lives, then inspect scheduling of workers
echo "== locating registry.py =="

REGS=$(fd -t f "registry.py" .)
echo "$REGS"

# For each found registry.py, search scheduling primitives
for r in $REGS; do
  echo "== scheduling patterns in $r =="
  rg -n --hidden -S \
    "asyncio\.create_task|create_task\(|to_thread|run_in_executor|ensure_future|ThreadPoolExecutor" "$r" || true
done

# 4) Also search within studio_server jobs for scheduling patterns (broader than registry.py path guess)
echo "== scheduling patterns in studio_server/jobs =="
rg -n --hidden -S \
  "asyncio\.create_task|create_task\(|to_thread|run_in_executor|ensure_future|ThreadPoolExecutor" \
  app/desktop/studio_server/jobs || true

Repository: Kiln-AI/Kiln

Length of output: 13640


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Inspect Task._runs implementation (likely the disk access)
FILE="libs/core/kiln_ai/datamodel/task.py"
echo "== locate _runs in $FILE =="
rg -n "def _runs\(" "$FILE"
start=$(rg -n "def _runs\(" "$FILE" | head -n1 | cut -d: -f1)
sed -n "$((start-20)),$((start+200))p" "$FILE" | cat -n

# Inspect base implementation of runs/filesystem handling if _runs lives in a base class
# Search for "_runs(" across datamodel
echo "== search for other _runs implementations =="
rg -n "def _runs\(" libs/core/kiln_ai/datamodel | head -n 50

# Inspect registry worker scheduling
REG="app/desktop/studio_server/jobs/registry.py"
echo "== inspect scheduling around create_task in $REG =="
sed -n "$((200)),$((280))p" "$REG" | cat -n

Repository: Kiln-AI/Kiln

Length of output: 114


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# 1) Find where Task._runs is defined (it's called by Task.runs)
echo "== find def _runs( in datamodel =="
rg -n --hidden "def _runs\s*\(" libs/core/kiln_ai/datamodel || true

echo "== if not in datamodel, search broader in libs/core/kiln_ai =="
rg -n --hidden "def _runs\s*\(" libs/core/kiln_ai || true

# 2) If we found candidate file(s), print a focused excerpt around each match
echo "== print excerpts around _runs definitions =="
FILES=$(rg -n --hidden "def _runs\s*\(" libs/core/kiln_ai -l || true)
for f in $FILES; do
  line=$(rg -n --hidden "def _runs\s*\(" "$f" | head -n1 | cut -d: -f1)
  echo "--- $f:$line ---"
  sed -n "$((line-30)),$((line+140))p" "$f" | cat -n
done

# 3) Inspect job worker execution path in studio_server/jobs/registry.py
REG="app/desktop/studio_server/jobs/registry.py"
echo "== inspect worker scheduling/dispatch in $REG =="
sed -n "1,340p" "$REG" | cat -n
echo "== look for where the worker 'run' is awaited/called =="
rg -n "worker\.(run|execute)|await .*worker|worker\(" "$REG"

Repository: Kiln-AI/Kiln

Length of output: 15592


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "== Search for any mention/definition of _runs =="
rg -n --hidden "_runs" libs/core/kiln_ai/datamodel | head -n 200

echo "== Search full core for '_runs(' implementation =="
rg -n --hidden "def\s+_runs\s*\(" libs/core/kiln_ai | head -n 50

echo "== Search for filesystem access used in runs-loading code =="
rg -n --hidden "open\(|os\.listdir\(|glob\.glob|Path\(|rglob\(|read_text\(|read_bytes\(" \
  libs/core/kiln_ai/datamodel | head -n 200

# Also locate the base class that Task inherits from (to find where _runs is defined)
echo "== Inspect Task class definition/inheritance =="
rg -n --hidden "class\s+Task\b" libs/core/kiln_ai/datamodel/task.py
python3 - <<'PY'
import re, pathlib
p=pathlib.Path("libs/core/kiln_ai/datamodel/task.py")
txt=p.read_text()
m=re.search(r"class\s+Task\s*\(([^)]*)\)\s*:", txt, re.M)
print("INHERITANCE:", m.group(1) if m else "NOT_FOUND")
PY

# Print the top of task.py where Task class is declared (imports/base)
sed -n '1,120p' libs/core/kiln_ai/datamodel/task.py | cat -n

Repository: Kiln-AI/Kiln

Length of output: 10092


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Inspect Task relationship declaration for _runs
rg -n "_runs": libs/core/kiln_ai/datamodel/task.py
sed -n '110,170p' libs/core/kiln_ai/datamodel/task.py | cat -n

# Locate ParentOfRelationship definition and how it loads from filesystem
rg -n "class\s+ParentOfRelationship|ParentOfRelationship" libs/core/kiln_ai/datamodel
# Print relevant portions of basemodel where relationship/file access likely implemented
sed -n '650,900p' libs/core/kiln_ai/datamodel/basemodel.py | cat -n

# Specifically search for the method that backs _runs accessor (often something like _get_children/_load_on_disk)
rg -n "_get_|filesystem_name|readonly|Path\(|glob\(|open\(|read_text\(|read_bytes\(" libs/core/kiln_ai/datamodel/basemodel.py

Repository: Kiln-AI/Kiln

Length of output: 20630


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# 1) Find where JobRegistry._reconcile calls worker.compute_state (if at all)
REG="app/desktop/studio_server/jobs/registry.py"
echo "== locate _reconcile and compute_state usage in $REG =="
rg -n "def _reconcile|compute_state\(" "$REG"
# print around _reconcile
line=$(rg -n "async def _reconcile" "$REG" | head -n1 | cut -d: -f1 || true)
if [ -n "${line:-}" ]; then
  sed -n "$((line-40)),$((line+220))p" "$REG" | cat -n
fi

# 2) Locate iterate_children_paths_of_parent_path implementation and load_from_file open()
BASE="libs/core/kiln_ai/datamodel/basemodel.py"
echo "== locate iterate_children_paths_of_parent_path =="
rg -n "def iterate_children_paths_of_parent_path" "$BASE"
start=$(rg -n "def iterate_children_paths_of_parent_path" "$BASE" | head -n1 | cut -d: -f1)
sed -n "$((start-30)),$((start+120))p" "$BASE" | cat -n

echo "== locate load_from_file open() =="
rg -n "def load_from_file" "$BASE"
start2=$(rg -n "def load_from_file" "$BASE" | head -n1 | cut -d: -f1)
sed -n "$((start2-10)),$((start2+70))p" "$BASE" | cat -n

Repository: Kiln-AI/Kiln

Length of output: 17705


Offload compute_state filesystem scans from the event loop (verified blocking IO)

compute_state() calls task.runs(readonly=True) and eval_config.runs(readonly=True), which synchronously enumerate runs/ (os.scandir(...)) and synchronously load each child model (open(...)/read()/json.loads(...)) inside load_from_file(). JobRegistry schedules workers with asyncio.create_task(...) and directly awaits worker.compute_state()/worker.run() on the same event loop, so large eval/task run directories can stall progress/SSE updates.

Move the .runs(...) materialization (and related set-building) into asyncio.to_thread(...) / run_in_executor, keeping only the lightweight set math on the event loop.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@app/desktop/studio_server/jobs/workers/eval.py` around lines 42 - 104,
compute_state currently performs blocking filesystem/model loads via
task.runs(readonly=True) and eval_config.runs(readonly=True) which can stall the
event loop; move the materialization and set-building (the calls that iterate
and load runs and build in_filter_ids and scored_ids) into a thread/executor
(e.g. asyncio.to_thread or run_in_executor) and return only the computed
primitive totals/sets to the async function so compute_state does only
lightweight set math and returns JobDerivedState; specifically wrap the logic
that calls task.runs and eval_config.runs and constructs in_filter_ids and
scored_ids into a synchronous helper (or inline closure) executed via
asyncio.to_thread, then use its result to compute success/total/is_complete in
compute_state.

Comment on lines +431 to +444
<li class="menu-sm">
<button
type="button"
class="text-xs text-base-content/60"
on:click={() => jobs_dialog.open()}
aria-label="Background jobs"
>
<div class="sidebar-icon opacity-60">
<JobsIcon />
</div>
In progress
<SidebarJobsIndicator variant="inline" />
</button>
</li>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Redundant aria-label suppresses the job-count context for screen readers.

The button already has visible text ("In progress") and a nested SidebarJobsIndicator whose aria-label conveys the active count. Because an explicit aria-label on the button overrides all descendant content, assistive tech announces only "Background jobs" and drops the count. Consider dropping the redundant label so the computed name includes the visible text and indicator.

♿ Suggested change
         <button
           type="button"
           class="text-xs text-base-content/60"
           on:click={() => jobs_dialog.open()}
-          aria-label="Background jobs"
         >
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
<li class="menu-sm">
<button
type="button"
class="text-xs text-base-content/60"
on:click={() => jobs_dialog.open()}
aria-label="Background jobs"
>
<div class="sidebar-icon opacity-60">
<JobsIcon />
</div>
In progress
<SidebarJobsIndicator variant="inline" />
</button>
</li>
<li class="menu-sm">
<button
type="button"
class="text-xs text-base-content/60"
on:click={() => jobs_dialog.open()}
>
<div class="sidebar-icon opacity-60">
<JobsIcon />
</div>
In progress
<SidebarJobsIndicator variant="inline" />
</button>
</li>
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@app/web_ui/src/routes/`(app)/+layout.svelte around lines 431 - 444, Remove
the redundant aria-label on the button so the computed accessible name includes
the visible "In progress" text plus the nested SidebarJobsIndicator count;
locate the button element that calls jobs_dialog.open() (the one wrapping
JobsIcon and SidebarJobsIndicator) and delete the aria-label="Background jobs"
attribute, ensuring SidebarJobsIndicator still provides its own aria-label for
the active count if needed.

@leonardmq leonardmq marked this pull request as draft May 29, 2026 20:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant