Skip to content

ClusterWorkflowEngine: parent resume after the first child-await waits for entityMessagePollInterval (sendResumeParent omits pollStorage after reset) #6294

Description

@IgnisDa

Summary

With ClusterWorkflowEngine, a workflow that awaits more than one child workflow stalls for ~entityMessagePollInterval (default 10s) on every resume after the first. The first child-await resumes the parent promptly; the second and each subsequent one is not woken via the in-process storage-read latch and instead waits for the periodic storage poll. This is independent of load (reproduces single-node, idle).

Environment

  • @effect/cluster 0.59.0
  • @effect/workflow 0.18.2
  • effect 3.21.3
  • Runner: SingleRunner.layer({ runnerStorage: "sql" }) — single node, SQL MessageStorage.

Root cause

When a child workflow completes, ClusterWorkflowEngine resumes its parent via sendResumeParent. Paraphrasing the 0.59.0 dist/esm/ClusterWorkflowEngine.js logic:

const requestId = yield* requestIdFor({ /* parent */ tag: "resume", id: "" })
if (Option.isNone(requestId)) {
  // FIRST resume: send a fresh `resume` message through the client.
  // → Sharding.sendOutgoing → notifyLocal → opens storageReadLatch → prompt.
  return yield* client.resume({}, { discard: true })
}
const reply = yield* replyForRequestId(requestId.value)
if (Option.isNone(reply)) return
// SECOND+ resume: the parent already has a processed `resume` request →
yield* sharding.reset(requestId.value)   // clears the reply, but never opens the latch

sharding.reset(requestId) is just storage.clearReplies(requestId) — it marks the request for reprocessing but does not open storageReadLatch. The cleared request is therefore only re-read by the storage-read loop the next time the loop's latch is opened on its timer, i.e. every entityMessagePollInterval (ShardingConfig default Duration.seconds(10)).

For contrast, the engine's own resume() helper (invoked by the deferred/resume RPC handlers) performs the same reset and then opens the latch:

// ClusterWorkflowEngine resume()
yield* sharding.reset(Snowflake.Snowflake(maybeSuspended.value.requestId))
yield* sharding.pollStorage   // == storageReadLatch.open  ← missing in sendResumeParent's reset branch

So sendResumeParent's reset branch is missing the equivalent sharding.pollStorage that every other reset call site pairs with.

(Minor related note: the middle branch — reply is None — returns and does nothing, assuming an in-flight resume will cover it. The high-impact, deterministic bug is the reset branch above.)

Reproduction

A parent workflow that awaits two child workflows in sequence. Each child does one durable step so the parent genuinely suspends and is resumed once per child:

const Child = Workflow.make({
  name: "Child",
  success: Schema.Void,
  payload: Schema.Struct({ n: Schema.Number }),
})
const Parent = Workflow.make({ name: "Parent", success: Schema.Void, payload: Schema.Struct({}) })

const ChildLive = Child.toLayer(({ n }) =>
  Activity.make({ name: `work-${n}`, execute: Effect.sleep("200 millis") }),
)

const ParentLive = Parent.toLayer(() =>
  Effect.gen(function* () {
    const engine = yield* WorkflowEngine
    yield* Effect.log("parent: awaiting child-1")
    yield* engine.execute(Child, { executionId: "child-1", payload: { n: 1 } }) // resumes promptly
    yield* Effect.log("parent: awaiting child-2")
    yield* engine.execute(Child, { executionId: "child-2", payload: { n: 2 } }) // ~10s before this resumes
    yield* Effect.log("parent: done")
  }),
)

Run Parent once on SingleRunner.layer({ runnerStorage: "sql" }). The parent body replays on each resume; the wall-clock gap between resuming for child-1 and resuming for child-2 is ~entityMessagePollInterval (~10s with defaults), even though child-2 finishes in ~200ms.

Expected vs. actual

  • Expected: every child completion resumes the parent promptly, as the first does.
  • Actual: the second and later child completions resume the parent only on the next entityMessagePollInterval storage poll (~10s with defaults).

Suggested fix

In sendResumeParent, open the storage-read latch after the reset, mirroring resume():

yield* sharding.reset(requestId.value)
yield* sharding.pollStorage   // reprocess the reset request immediately instead of waiting for the poll

Workaround

Lower the poll interval so the fallback fires sooner:

SingleRunner.layer({
  runnerStorage: "sql",
  shardingConfig: { entityMessagePollInterval: Duration.millis(250) },
})

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Fields

    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions