feat: durable background execution + HITL suspend/resume schema#13633
feat: durable background execution + HITL suspend/resume schema#13633Cristhianzl wants to merge 15 commits into
Conversation
|
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Path: .coderabbit.yaml Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
✅ Migration Validation Passed All migrations follow the Expand-Contract pattern correctly. |
ogabrielluiz
left a comment
There was a problem hiding this comment.
Hey @Cristhianzl, notes are inline, the checkpoint ones come with repros I ran against this branch. One I couldn't anchor inline: LE-1439 asks to either remove the now consumerless settings (background_backend_is_scaled, test_redis_url, background_watchdog_interval_s) or document that nothing on this slice consumes them. They still carry their original scaled backend docstrings in lfx settings/groups/runtime.py, and background_backend_is_scaled's docstring now contradicts the new service.py docstring that says this branch has no scaled backend.
| cleanup_tasks.add(cleanup_task) | ||
| cleanup_task.add_done_callback(cleanup_tasks.discard) | ||
| raise | ||
| except GraphPausedException: |
There was a problem hiding this comment.
I think the unwrapped re-raise doesn't survive the next frame: _run_vertex_build goes through JobService.execute_with_status, whose generic except catches GraphPausedException and writes the workflow job row to FAILED with finished_timestamp set before re-raising. I reproduced it with the seam test's setup plus an assertion on the durable row, and the job comes back FAILED. So a pause still terminalizes the run in the job table. Could you add a GraphPausedException branch in execute_with_status that marks the job SUSPENDED instead?
|
|
||
| def _restore_model(module_name: str, class_name: str, payload: Any) -> Any: | ||
| # Why: the module path is data read back from storage — importing arbitrary modules from it would be injection. | ||
| if not module_name.startswith("lfx."): |
There was a problem hiding this comment.
I think there's an asymmetry here: serialize_value accepts any BaseModel and tags its module, but _restore_model refuses anything outside lfx.*. So a vertex result holding e.g. a langchain_core Document checkpoints fine and then every resume raises TypeError, which leaves the suspended run permanently stuck. Repro: serialize_value(Document(page_content='hello')) saves with module langchain_core.documents.base, and deserialize_value of that envelope raises. Could you apply the non-lfx check at serialize time and degrade to None there, like the docstring already promises for opaque values? That way an unresumable checkpoint can never be written.
| built_object_wire: dict[str, Any] | None = None | ||
| raw = vertex.built_object | ||
| if vertex.built and raw is not None and not isinstance(raw, (UnbuiltObject, UnbuiltResult)): | ||
| built_object_wire = serialize_value(raw) |
There was a problem hiding this comment.
This guard only fires when built_object itself is opaque, but built_object is usually a dict of outputs, and serialize_value(dict) turns an opaque member into None without bubbling up. Repro: built_object={'dataframe': DataFrame(...)} passes build_checkpoint, the checkpoint stores {'dataframe': None}, and the resumed vertex comes back built=True with built_object={'dataframe': None}, so downstream vertices read None. DataFrame outputs are first class in langflow, so this is the common case, exactly the corruption the comment above says this raise prevents. I think the None check needs to recurse into dict values, and results/artifacts on lines 32-33 have the same gap.
| _WIRE_KIND: "model", | ||
| "module": type(value).__module__, | ||
| "name": type(value).__qualname__, | ||
| "value": value.model_dump(mode="json"), |
There was a problem hiding this comment.
model_dump(mode='json') raises PydanticSerializationError when the model holds a non-serializable member, e.g. Data(data={'client': SomeClient()}), and there's no try/except between here and check_and_handle_pause, so the pause attempt turns the run into a failure. That contradicts the degrade-to-None contract in this function's docstring. Components stash arbitrary objects in Data.data all the time. Could you catch the serialization error here and degrade?
| def restore_graph_from_checkpoint(checkpoint: GraphCheckpoint, *, store: CheckpointStore | None = None) -> Graph: | ||
| from lfx.graph.graph.base import Graph | ||
|
|
||
| graph = Graph.from_payload(checkpoint.flow_payload, flow_id=checkpoint.flow_id) |
There was a problem hiding this comment.
inactivated_vertices is captured in the checkpoint but never applied back here, and VertexCheckpointData doesn't carry vertex.state, so everything resumes ACTIVE and compute_resume_layer queues branches a ConditionalRouter had stopped. Repro: mark_vertex('chat_output', 'INACTIVE'), checkpoint, resume: the resumed graph has inactivated_vertices == set() and compute_resume_layer returns ['chat_output']. Could you reapply the inactivated/activated sets and vertex state in restore_graph_from_checkpoint?
| } | ||
| ) | ||
| manager.vertices_to_run.update(checkpoint.vertices_being_run) | ||
| graph.run_manager = manager |
There was a problem hiding this comment.
RunnableVerticesManager.to_dict doesn't include cycle_vertices, so this assignment replaces the manager prepare() populated with one whose cycle set is empty, and a resumed flow with a loop never schedules its loop vertices (process returns with unbuilt vertices, silently). Repro: m.add_to_cycle_vertices('x'); RunnableVerticesManager.from_dict(m.to_dict()).cycle_vertices == set(). I think the simplest fix is re-adding the prepared manager's cycle_vertices after this assignment.
| vertex = graph.get_vertex(vertex_id) | ||
| except ValueError: | ||
| continue | ||
| vertex.built = vertex_data.built |
There was a problem hiding this comment.
LE-1440 also asks for the checkpoint-restored-vertex guard in build result collection so a resumed vertex doesn't re-consume its async generator, and I don't see it: this restores results/artifacts/built_object/built_result but not vertex.result, and the consume_async_generator site in graph base is unchanged. Is that landing in a later ticket or did it get dropped?
| store = get_checkpoint_service() | ||
| await store.save(checkpoint) | ||
| info = self.pause_info or {} | ||
| raise GraphPausedException( |
There was a problem hiding this comment.
LE-1440 wants this to re-raise unwrapped at both the build driver and the lfx graph run boundary, but Graph._run still wraps everything in ValueError (around line 910), so a pause through Graph.arun comes out as 'Error running graph: ...' with the pause as cause. I verified it: arun with a 'pause' probe raises ValueError, not GraphPausedException. Could you add the GraphPausedException/CancelledError re-raise before the generic except in _run too?
| op.execute("ALTER TYPE execution_signal_type_enum ADD VALUE IF NOT EXISTS 'pause'") | ||
| op.execute("ALTER TYPE execution_signal_type_enum ADD VALUE IF NOT EXISTS 'resume'") | ||
| else: | ||
| with op.batch_alter_table("execution_signals", recreate="always") as batch_op: |
There was a problem hiding this comment.
LE-1438 says the SQLite upgrade body is a no-op since the enums are stored as VARCHAR, but this branch does a full table rebuild with recreate='always'. The base migration created signal_type via plain sa.Enum and create_constraint defaults to False in SQLAlchemy 2.x, so there's no CHECK constraint to widen and the rebuild is dead DDL. It's also inconsistent: if execution_signals needed a rebuild, job.status would too, and that's left untouched. Could you make the SQLite branch a pass?
| await svc.stop() | ||
|
|
||
|
|
||
| async def test_submit_runs_in_process_when_job_queue_type_redis(active_user, monkeypatch): |
There was a problem hiding this comment.
The F1 acceptance criterion asks for a real flow here (connected components wired via .set(), run through the facade to a terminal state), but this passes a scripted frame source. It proves the import path is safe, not the real build path. Could you swap it for a real trivial flow?
erichare
left a comment
There was a problem hiding this comment.
Overall
First — this is a genuinely strong piece of work. Durable suspend/resume layered over a background-execution substrate is one of the harder things to get right in this codebase, and the architecture holds up well in the places that matter most:
- The pause/resume control flow is well-designed. The
PauseRequested/GraphPausedExceptionsplit is clean:GraphPausedExceptionis caught ingenerate_flow_events, converted to ahuman_input_requiredframe, and re-raised asPauseRequestedfor the runner to route into_suspend(). The pause boundary is correctly placed between complete batch layers, so there's no partial-layer race. - The concurrency primitives are right.
claim_suspended_for_resumeis a proper atomic single-flight conditionalUPDATE(only one caller wins the resume), background task references are held so nothing gets GC'd mid-flight, and the durable-replay ordering is sound. - The migrations are well-formed — linear revision chain, the Postgres
ALTER TYPE … ADD VALUE IF NOT EXISTSinside an autocommit block is the correct pattern, and the SQLite batch-alter preserves existing data. - Security is solid. The resume endpoint uses a correct 404-masked ownership check (no IDOR), and checkpoint deserialization goes through
JsonPlusSerializerwithpickle_fallback=False— no arbitrary-deserialization RCE surface. - The canvas perf fix (LE-1424) is exactly the right approach — writing HITL messages into the React Query cache via
setQueryDatawith session-scoped keys instead of firing network invalidations during SSE processing is precisely what kills the unbounded-refetch growth.
Three of the eight stories — LE-1441, LE-1451, LE-1424 — I'd consider review-clean as they stand.
The blockers below are concentrated in error / edge paths (resume-after-expiry, suspend-time DB failures, non-serializable built objects, loop flows) plus a couple of duplicate-state bugs. None are architectural — they're all localized fixes. This review focuses purely on code correctness; I've set aside the draft/merge-conflict status since that's known and orthogonal.
Summary: 10 issues — 7 blocking, 3 high — across 5 of the 8 stories.
LE-1438 — SUSPENDED status + PAUSE/RESUME signals
🔴 Blocking — duplicate concurrent run while paused. src/backend/base/langflow/services/jobs/service.py (~L131–152, and the mirror _existing_job_for_dedupe ~L188). The dedupe guard checks only {QUEUED, IN_PROGRESS, COMPLETED} — SUSPENDED is absent. A client retrying the same idempotency_key while a run is paused at a HITL node bypasses dedupe and launches a second concurrent execution of the same flow; the two runs then race the resume decision and can corrupt shared state. Fix: add JobStatus.SUSPENDED to both .in_([...]) lists.
🟠 High — checkpoint rows leak on deadline timeout. service.py sweep_input_deadlines (~L667–695) transitions SUSPENDED → FAILED but never calls delete_checkpoint. The explicit-cancel path does clean up, but the timeout path orphans a job_checkpoints row. Because GraphCheckpoint.expires_at is never set (_expired() is always False), JobScopedCheckpointStore._all() keeps full-scanning these stale rows, violating the documented "one row per actively-suspended job" invariant. Fix: await delete_checkpoint(job_id, "graph") per reconciled job in the sweep.
LE-1439 — Durable single-node background-execution slice
🔴 Blocking — partial _suspend() failure hangs the client indefinitely. src/backend/base/langflow/services/background_execution/runner.py (~L102–128). If _suspend() raises inside the except PauseRequested handler (e.g. a transient DB error in append_event/update_job_status), the exception jumps past the sibling except Exception straight to finally, where paused=True skips bus.close() and all terminal reconciliation. The job stays IN_PROGRESS, the live bus stays open, _is_terminal() never trips, and the attached SSE client hangs forever. With the default background_input_deadline_s=None there's no in-process recovery — only a server restart clears it. Fix: wrap _suspend() so a failure still reconciles to a terminal/clean state and closes the bus.
🟠 High — human decision permanently lost if _enqueue fails on resume. services/background_execution/service.py (~L317–325). After claim_suspended_for_resume irreversibly flips SUSPENDED → IN_PROGRESS and persists the RESUME signal, _enqueue runs with no try/except. If it raises (e.g. Executor is closed during shutdown), the job is stranded IN_PROGRESS, eventually swept to FAILED, and the resume endpoint returns 409 on retry — the user's decision is unrecoverable. Fix: guard _enqueue; on failure, roll the job back to SUSPENDED so resume can be retried.
LE-1440 — Graph checkpoint model + layer boundary
🔴 Blocking — loop flows deadlock on resume. src/lfx/src/lfx/graph/checkpoint/resume.py (~L32–50). prepare() correctly populates run_manager.cycle_vertices, then _restore_run_manager() overwrites run_manager with a fresh RunnableVerticesManager.from_dict() — and neither to_dict/from_dict carries cycle_vertices. After the swap it's an empty set, so any flow containing a Loop component never schedules its loop vertex on resume and hangs. Fix: repopulate from graph.cycle_vertices immediately after from_dict().
🔴 Blocking — LLM-then-HITL (the primary use case) fails every time. src/lfx/src/lfx/graph/checkpoint/builder.py (~L45–53). _vertex_data raises a bare TypeError when a built_object isn't JSON-serializable (e.g. a BaseChatModel wrapping an httpx.AsyncClient). That TypeError escapes check_and_handle_pause as a TypeError rather than GraphPausedException, so the suspend path is bypassed and the run is finalized FAILED with no checkpoint saved. The tests only cover cleanly-serializable ChatInput/ChatOutput, so the LLM-before-pause path is untested and currently broken. Fix: have the non-serializable case degrade into the pause path (or skip opaque built objects) rather than hard-raise.
LE-1442 — Non-terminal pause branch
🔴 Blocking — infinite recursion crashes resume after checkpoint expiry. src/backend/base/langflow/api/build.py (~L382–409). When build_resumed_graph_and_get_order gets None from store.load_by_run_id (expired/missing checkpoint, or a restart that cleared an in-memory store), it falls back to build_graph_and_get_order, which immediately re-dispatches back because resume/job_id are still set → mutual recursion to RecursionError. Every resume-after-TTL hits this and surfaces an opaque error. Fix: don't re-enter the dispatcher; raise a 404-style "checkpoint expired/not found".
🔴 Blocking — primary pause-seam test can never pass (CI gate). src/backend/tests/unit/api/test_build_pause_seam.py (~L78–93). test_build_path_pause_persists_checkpoint_and_propagates_unwrapped asserts GraphPausedException propagates unwrapped out of generate_flow_events, but the implementation catches it, emits human_input_required, and returns — so error is always None and the assertion fails on every run. The test contradicts the (correct) implementation. Fix: assert on the emitted human_input_required event + persisted checkpoint instead of an escaping exception.
LE-1603 — Canvas-side HITL badge + popover
🟠 High — stale HITL state leaks across flow navigation. src/frontend/src/controllers/API/agui/use-restore-canvas-hitl.ts (~L23–41). The restore effect calls setPending when it finds an unanswered card but never calls hitlStore.clear() when it finds none. The store holds a single unscoped {nodeId, content, job_id} slot, so leaving a suspended Flow A for Flow B keeps A's pending state; submitting a card on B then clears the wrong flow's state, and A's badge won't restore on return. Fix: clear the store (or scope it by flowId) when the scan finds no active card.
Shared agent substrate (epic LE-1437)
🔴 Blocking — duplicate chat messages on every tool-approval. src/lfx/src/lfx/components/models_and_agents/agent.py (~L743–750 and ~L822–823). On suspend, _suspend_for_tool_approval returns the agent message that was already persisted as state="partial" but never deletes it (unlike the ExceptionWithMessageError path, which does delete_message + remove_message). On resume, run_agent builds a fresh message that gets persisted as a second row. Result: two rows per HITL turn — one stuck partial, one complete — both rendered in the chat, accumulating with every interaction. Fix: delete/replace the partial message on suspend, or reuse its id on resume.
Non-blocking notes (FYI, not gating)
- LE-1441: the migration adds a redundant non-unique
ix_job_checkpoints_idindex on the PK column — minor write/storage overhead, worth dropping. - LE-1451:
WORKFLOWS_ENDPOINTwas hardcoded to/api/v2/workflows; fine for OSS, but enterprise forks that overrideBASE_URL_API_V2or rely on fetch-credential injection would needgetFetchCredentials()inconsumeBackgroundEvents. - LE-1439:
output_eventswas dropped from theset_resultpayload, which changesGET /jobs/{id}/result(no moreoutputs) — please confirm that's intentional vs. an accidental regression. - Latent (not reachable today):
JobScopedCheckpointStore._all()scans all users' checkpoint rows with nouser_idpredicate. No API path reaches it currently, but it'd become a cross-tenant leak iflist_by_session/loadis ever wired to a user-facing route — worth auser_idfilter before that happens.
Really nice feature overall — the hard parts (atomic resume, the pause boundary, the durable substrate, the perf fix) are done right. Happy to pair on any of the edge-path fixes.
Objective
Open the human-in-the-loop groundwork (LE-1437) on release-1.11.0: a durable background-execution substrate whose jobs can later suspend for human input and resume without losing state.