feat(core): migrate Durable Agent to evented workflow engine#17716
feat(core): migrate Durable Agent to evented workflow engine#17716taofeeq-deru wants to merge 89 commits into
Conversation
…astra-ai/mastra into agentic-loop-evented-workflows
The agentic-loop modules no longer statically import `workflows/evented` (that formed an ESM init-time cycle with the base `Workflow` class). They build evented workflows via the `createEventedWorkflow` registry instead, so the evented module must be loaded before those factories run. Load it with a dynamic import on the agent execute path and the loop stream path. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The agentic loop now runs on the evented workflow engine, which requires a Mastra with pubsub and running workers. The e2e test used MastraLLMVNext standalone with no Mastra, so every case failed with "Mastra instance with pubsub is required". Provide one in beforeAll and register it on the model. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The agentic loop runs on the evented workflow engine, which needs a pubsub-equipped Mastra with running workers and an atomic-capable storage adapter. Two tests used mocks too thin to satisfy it: - agent-fga "no FGA provider configured" registered a hand-rolled Mastra mock with no startWorkers; drop it so the agent uses its ephemeral Mastra. - runEvals "save scores to storage" used a partial storage mock whose workflows store lacks atomic operations; use a real InMemoryStore and spy on the real scores store. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
… loop A signal sent to an already-running agent could be folded into the run's first model request instead of getting its own turn. The evented engine registers a run as "active" before its first model request, so `llm-execution-step`'s pre-first-request drain would grab a follow-up signal that arrived in that window — collapsing two turns into one. Split the queue in two: signals sent to a reserved (not-yet-started) run stay folded into the first request, while signals sent to an active run are drained as separate turns by `signalDrainStep`. `sendSignal` already distinguishes the cases — route them to `preRunSignalsByThread` vs `pendingSignalsByThread`, and have `drainPendingSignals` take a `scope` to read the right one. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Evented workflow runs dropped the observability context: `EventedRun.start` ignored it, so the event processor never built a `tracingContext` and every step's `params.tracingContext` was undefined. Spans created inside an evented workflow — the model, tool, and processor spans of an agent run — were orphaned and never exported, truncating the trace at `agent_run`. `currentSpan` is a non-serializable AISpan, so it cannot ride the engine's pubsub events. Hold the run's tracing context on Mastra keyed by runId (`__registerRunTracingContext`): `EventedRun.start` records it, and the event processor reads it when dispatching each step — walking the parent-run chain so nested workflow runs inherit their parent's span. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
… engine The evented workflow engine never created its own workflow/step spans the way the default engine does, so agent traces viewed with internal spans shown were missing the `execution-workflow` / `agentic-loop` run spans and the per-step spans. `EventedRun.start` now creates a WORKFLOW_RUN span and registers it as the run's tracing context, so steps nest under it. The step executor creates a WORKFLOW_STEP span per step. Both honor the workflow's `tracingPolicy`, so internal workflows stay hidden unless internal spans are explicitly shown. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ontext When a nested workflow's initial snapshot is persisted, include the parent step's prevResult output as the input key in the context. This prevents updateWorkflowResults from later overwriting the event's stepResults (which contains input) with storage's empty context. Co-Authored-By: Mastra Code (anthropic/claude-opus-4-6) <noreply@mastra.ai>
Co-Authored-By: Mastra Code (anthropic/claude-opus-4-6) <noreply@mastra.ai>
🦋 Changeset detectedLatest commit: 4dcabd2 The changes in this PR will be included in the next version bump. This PR includes changesets to release 20 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
|
The latest updates on your projects. Learn more about Vercel for GitHub.
1 Skipped Deployment
|
PR triageLinked issue check skipped for core contributor @taofeeq-deru. PR complexity score
Applied label: Changed test gateChanged tests failed against the base branch as expected. Label: |
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Path: .coderabbit.yaml Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (3)
🚧 Files skipped from review as they are similar to previous changes (3)
WalkthroughThis PR migrates Durable Agent's internal workflow engine from synchronous to evented execution by adding runtime evented module loading, ephemeral Mastra instance resolution, internal workflow registration and teardown, switching agentic workflows to the evented engine, and updating tests to deterministically drain streams. No public API changes. ChangesDurable Agent Evented Workflow Migration
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Warning Review ran into problems🔥 ProblemsStopped waiting for pipeline failures after 30000ms. One of your pipelines takes longer than our 30000ms fetch window to run, so review may not consider pipeline-failure results for inline comments if any failures occurred after the fetch window. Increase the timeout if you want to wait longer or run a Comment |
There was a problem hiding this comment.
🧹 Nitpick comments (5)
packages/core/src/agent/durable/__tests__/resume-api.test.ts (1)
260-266: ⚡ Quick winConsider adding explicit timeout or assertion to the polling loop.
The polling loop waits for cached events but breaks silently after 100 iterations if no events arrive. If the evented engine fails to cache events, the test continues and only fails later at line 274 (or 308) when asserting
cachedEvents.length > 0, making the root cause harder to diagnose.Consider adding an assertion or throwing an error if the loop completes without finding events:
for (let i = 0; i < 100; i++) { await new Promise(resolve => setTimeout(resolve, 50)); const events = await cachingPubsub.getHistory(topic); if (events.length > 0) break; } +const cachedAfterWait = await cachingPubsub.getHistory(topic); +expect(cachedAfterWait.length).toBeGreaterThan(0);This provides clearer failure messages and catches issues at the polling site.
Also applies to: 296-302
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@packages/core/src/agent/durable/__tests__/resume-api.test.ts` around lines 260 - 266, The polling loop that waits for cached events (using the topic variable `const topic = \`agent.stream.${runId}\`` and `cachingPubsub.getHistory(topic)`) may exit silently after 100 iterations; modify the loop to fail fast by adding an explicit timeout/maximum-wait assertion: after the loop completes without finding events, throw an Error or call an assertion like `assert.fail` with a clear message that no cached events arrived for the topic and include the runId/topic in the message; apply the same change to the second polling block that uses `cachingPubsub.getHistory` at the 296-302 site so tests report the polling timeout immediately.packages/core/src/agent/durable/__tests__/durable-agent-background-tasks.test.ts (1)
169-181: ⚡ Quick winInconsistent stream cleanup pattern across background-tasks tests.
This test (and several others at lines 209-223, 260-288, 323-341, 392-421, 496-511) uses a fixed
setTimeoutfollowed by immediatecleanup(), whereas the request-context and stopwhen test files in this PR consistently drain streams before cleanup viaawait drain(result.fullStream)orawait output.consumeStream().Since the evented engine is async, fixed timeouts are non-deterministic and may not guarantee the stream has finished processing. Consider updating these tests to explicitly drain the stream before calling
cleanup()for consistency and determinism.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@packages/core/src/agent/durable/__tests__/durable-agent-background-tasks.test.ts` around lines 169 - 181, The test uses a fixed setTimeout before calling cleanup on the stream returned by durableAgent.stream, which is non-deterministic; replace the timeout-based wait with an explicit stream drain/consume step: after calling durableAgent.stream('Research quantum', {...}) capture the returned stream (or result.fullStream / result) and await a drain utility (e.g., await drain(result.fullStream) or await output.consumeStream()) to ensure all events are processed before calling cleanup(); apply the same change to the other background-tasks tests referenced (the blocks around lines 209-223, 260-288, 323-341, 392-421, 496-511) so they all use deterministic stream draining instead of setTimeout.packages/core/src/agent/durable/durable-agent.ts (2)
469-471: 💤 Low valueRedundant
startWorkers()call.
#getEffectiveMastra()already callsawait ephemeral.startWorkers()at line 341 when creating the ephemeral instance. The second call here is redundant for the ephemeral case, and for the user's Mastra (returned directly from line 327), workers should already be started. Consider removing the duplicate call.♻️ Suggested fix
const effectiveMastra = await this.#getEffectiveMastra(); - await effectiveMastra.startWorkers(); effectiveMastra.__registerInternalWorkflow(workflow as any, runId);🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@packages/core/src/agent/durable/durable-agent.ts` around lines 469 - 471, Remove the redundant await effectiveMastra.startWorkers() call here and rely on `#getEffectiveMastra`() to ensure workers are started for both ephemeral and user Mastra instances; keep the subsequent effectiveMastra.__registerInternalWorkflow(workflow as any, runId) invocation unchanged. Locate the code around `#getEffectiveMastra`, startWorkers, effectiveMastra and remove the extra startWorkers invocation so workers are not started twice.
724-726: 💤 Low valueSame redundant
startWorkers()call in resume path.Similar to
executeWorkflow(),#getEffectiveMastra()already starts workers when creating the ephemeral instance.♻️ Suggested fix
const effectiveMastra = await this.#getEffectiveMastra(); - await effectiveMastra.startWorkers(); effectiveMastra.__registerInternalWorkflow(workflow as any, runId);🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@packages/core/src/agent/durable/durable-agent.ts` around lines 724 - 726, The resume path is calling effectiveMastra.startWorkers() redundantly because `#getEffectiveMastra`() already starts workers when creating the ephemeral Mastra; remove the await effectiveMastra.startWorkers() call and keep the subsequent effectiveMastra.__registerInternalWorkflow(workflow as any, runId) registration so the code uses the already-initialized worker state from `#getEffectiveMastra`().packages/core/src/agent/durable/workflows/create-durable-agentic-workflow.ts (1)
114-136: 💤 Low valueConsider extracting the duplicated
shouldPersistSnapshotpredicate.The same persistence logic appears in both
singleIterationWorkflow(lines 119-129) and the main agentic loop workflow (lines 222-232). Extracting to a shared constant would reduce maintenance burden if the persistence rules change.const shouldPersistSnapshot: typeof params => boolean = params => params.workflowStatus === 'pending' || params.workflowStatus === 'paused' || params.workflowStatus === 'suspended';Also applies to: 217-238
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@packages/core/src/agent/durable/workflows/create-durable-agentic-workflow.ts` around lines 114 - 136, Extract the duplicated predicate into a single exported constant (e.g., shouldPersistSnapshot) and use it in both createEventedWorkflow calls (the one with id DurableStepIds.AGENTIC_EXECUTION / singleIterationWorkflow and the main agentic loop workflow) instead of duplicating the inline function; implement the constant with the same logic (return true when params.workflowStatus is 'pending' || 'paused' || 'suspended') and update the options.shouldPersistSnapshot references to call that shared constant so maintenance of persistence rules is centralized.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In
`@packages/core/src/agent/durable/__tests__/durable-agent-background-tasks.test.ts`:
- Around line 169-181: The test uses a fixed setTimeout before calling cleanup
on the stream returned by durableAgent.stream, which is non-deterministic;
replace the timeout-based wait with an explicit stream drain/consume step: after
calling durableAgent.stream('Research quantum', {...}) capture the returned
stream (or result.fullStream / result) and await a drain utility (e.g., await
drain(result.fullStream) or await output.consumeStream()) to ensure all events
are processed before calling cleanup(); apply the same change to the other
background-tasks tests referenced (the blocks around lines 209-223, 260-288,
323-341, 392-421, 496-511) so they all use deterministic stream draining instead
of setTimeout.
In `@packages/core/src/agent/durable/__tests__/resume-api.test.ts`:
- Around line 260-266: The polling loop that waits for cached events (using the
topic variable `const topic = \`agent.stream.${runId}\`` and
`cachingPubsub.getHistory(topic)`) may exit silently after 100 iterations;
modify the loop to fail fast by adding an explicit timeout/maximum-wait
assertion: after the loop completes without finding events, throw an Error or
call an assertion like `assert.fail` with a clear message that no cached events
arrived for the topic and include the runId/topic in the message; apply the same
change to the second polling block that uses `cachingPubsub.getHistory` at the
296-302 site so tests report the polling timeout immediately.
In `@packages/core/src/agent/durable/durable-agent.ts`:
- Around line 469-471: Remove the redundant await effectiveMastra.startWorkers()
call here and rely on `#getEffectiveMastra`() to ensure workers are started for
both ephemeral and user Mastra instances; keep the subsequent
effectiveMastra.__registerInternalWorkflow(workflow as any, runId) invocation
unchanged. Locate the code around `#getEffectiveMastra`, startWorkers,
effectiveMastra and remove the extra startWorkers invocation so workers are not
started twice.
- Around line 724-726: The resume path is calling effectiveMastra.startWorkers()
redundantly because `#getEffectiveMastra`() already starts workers when creating
the ephemeral Mastra; remove the await effectiveMastra.startWorkers() call and
keep the subsequent effectiveMastra.__registerInternalWorkflow(workflow as any,
runId) registration so the code uses the already-initialized worker state from
`#getEffectiveMastra`().
In
`@packages/core/src/agent/durable/workflows/create-durable-agentic-workflow.ts`:
- Around line 114-136: Extract the duplicated predicate into a single exported
constant (e.g., shouldPersistSnapshot) and use it in both createEventedWorkflow
calls (the one with id DurableStepIds.AGENTIC_EXECUTION /
singleIterationWorkflow and the main agentic loop workflow) instead of
duplicating the inline function; implement the constant with the same logic
(return true when params.workflowStatus is 'pending' || 'paused' || 'suspended')
and update the options.shouldPersistSnapshot references to call that shared
constant so maintenance of persistence rules is centralized.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: d69fd6bf-cd0f-4890-b30b-0c99f85cde40
⛔ Files ignored due to path filters (1)
packages/mcp/integration-tests/pnpm-lock.yamlis excluded by!**/pnpm-lock.yaml
📒 Files selected for processing (16)
.changeset/thick-signs-decide.mdpackages/core/src/agent/agent.tspackages/core/src/agent/durable/__tests__/durable-agent-background-tasks.test.tspackages/core/src/agent/durable/__tests__/durable-agent-request-context.test.tspackages/core/src/agent/durable/__tests__/durable-agent-stopwhen.test.tspackages/core/src/agent/durable/__tests__/resume-api.test.tspackages/core/src/agent/durable/durable-agent.tspackages/core/src/agent/durable/evented-agent.tspackages/core/src/agent/durable/workflows/create-durable-agentic-workflow.tspackages/core/src/agent/durable/workflows/steps/background-task-check.tspackages/core/src/agent/durable/workflows/steps/llm-execution.tspackages/core/src/agent/durable/workflows/steps/llm-mapping.tspackages/core/src/agent/durable/workflows/steps/scorer-execution.tspackages/core/src/agent/durable/workflows/steps/tool-call.tspackages/core/src/loop/workflows/stream.tspackages/core/src/workflows/evented/workflow-event-processor/index.ts
Summary
Migrates the Durable Agent's internal workflow engine from the synchronous
createWorkflow()to the eventedcreateEventedWorkflow()engine. This brings durability and observability improvements to durable agent runs by leveraging the same event-driven architecture used by standard workflows.What changed
createDurableAgenticWorkflownow usescreateEventedWorkflow()instead ofcreateWorkflow()DurableAgentlifecycle methods (stream,generate,resume) updated to work with the evented engine, includingmastra.startWorkers()and internal workflow registrationinputkey from the parent step's result in the persisted context, preventing data loss during step executionMastrainstance still get proper pubsub and storage for the evented engineTransparency
This change is fully transparent — existing Durable Agent usage continues to work without any API changes. The
shouldPersistSnapshotoption on the durable agentic workflow controls which statuses are persisted (pending,paused,suspended), matching the previous behavior.Test plan
tsc --noEmit)ELI5
This PR swaps the Durable Agent’s internal engine for an event-based one so long-running agent runs are more durable and observable; it happens behind the scenes with no API changes.
Summary
The Durable Agent’s internal workflow engine was migrated from synchronous createWorkflow() to the event-driven createEventedWorkflow(), improving durability, observability, and recovery while remaining backward-compatible.
Core Changes
Observability & Serialization
Test & Infrastructure Changes
Compatibility