Skip to content

is() can hang when publish lands between valueOf() check and each() subscription #217

@scottkgaran

Description

@scottkgaran

Summary

is() from @effectionx/signals can hang indefinitely even though stream.valueOf() already satisfies the predicate. This happens when a concurrent task publishes between is()'s synchronous valueOf() check and its each() subscription — a classic TOCTOU race.

We hit this in production test code (Duplo) when using is() for cross-task synchronization under Effection v4 structured concurrency. We worked around it by switching to Effection Channel for in-operation sync, but is() appears intended for exactly this use case per the README and .policies/no-sleep-test-sync.md.

Expected behavior

is(signal, predicate) should resolve once the signal's value satisfies the predicate — including when a publish occurs concurrently, as long as valueOf() would reflect that state.

Actual behavior

The operation hangs forever. valueOf() shows the updated state, but is() never completes because the publish was dropped.

Root cause

is() is a two-step check-then-subscribe:

export function* is<T>(stream: ValueSignal<T>, predicate: (item: T) => boolean): Operation<void> {
  const result = predicate(stream.valueOf());
  if (result) return;

  for (const value of yield* each(stream)) {  // subscription starts here
    const result = predicate(value);
    if (result) return;
    yield* each.next();
  }
}

Under Effection v4:

  1. createSignal().send() is fire-and-forget — it delivers to current subscribers only, with no replay buffer (effection createSignal docs explicitly warn against using signals for in-operation messaging; use Channel instead).
  2. each(stream) spawns subscription setup and blocks on subscription.next() for the first stream event before the loop body runs.
  3. A spawned sibling runs on the next yield. If it calls signal.push() / signal.set() in the gap between step 1 (valueOf() → false) and step 2 (each() subscribed), the event reaches zero subscribers and is lost.
  4. is() then waits on each() for a second event that never comes, even though valueOf() already reflects the mutation.

Minimal repro

@effectionx/signals@0.5.3, effection@4.x:

import { createArraySignal, is } from "@effectionx/signals";
import { race, run, sleep, spawn } from "effection";

await run(function* () {
  let snapshot: readonly number[] | undefined;

  const outcome = yield* race([
    (function* () {
      const signal = yield* createArraySignal<number>([]);

      yield* spawn(function* () {
        signal.push(42);
        snapshot = signal.valueOf();
      });

      // spawn fires on the next yield — inside is() between valueOf() and each()
      yield* is(signal, (xs) => xs.length >= 1);
      return "is-resolved" as const;
    })(),
    (function* () {
      yield* sleep(100);
      return "timed-out" as const;
    })(),
  ]);

  // outcome === "timed-out" — is() hung
  // snapshot === [42] — state changed but waiter missed the event
});

Note: is() does work when the value is already present before subscribing (straight-line push then is), and when the producer yields first (see below).

Why existing tests may not catch this

Several effectionx tests avoid the race by yielding before publishing:

  • signals/helpers.test.ts — producer does yield* sleep(0) before open.set(true), giving is() time to subscribe
  • stream-helpers/test-helpers/faucet.test.ts — same sleep(0) before pour()

These pass reliably but mask the TOCTOU when spawn runs immediately on the next scheduler turn (no sleep(0)).

Relation to other work

This looks related to #213 (moving replay semantics out of signals into stream-helpers / createSubject), which notes that signal.send() within a running operation can miss subscribers because the drain can't get a scheduling turn. is() has the same class of problem at the subscription boundary rather than the signal primitive itself.

The .policies/no-sleep-test-sync.md policy recommends is(signal, predicate) for waiting on signal state in tests. That guidance is unsafe without an additional yield (e.g. sleep(0)) in the producer, which partially defeats the purpose of deterministic sync.

Impact

Using is() for cross-task test synchronization (spawn handler → wait for signal state) is unreliable under Effection v4's cooperative scheduling. Callers must either:

  • add sleep(0) in the producer (fragile, timing-dependent), or
  • use Channel / Queue with subscribe-first discipline, or
  • use withResolvers() / other helpers that don't have a check-then-subscribe gap

Suggested directions

  1. Re-check valueOf() after subscribing — subscribe first (or use a replay-aware primitive), then check current value before waiting for the next event.
  2. Document as limitation — if is() is only safe when the predicate is already true or the publisher yields first, say so explicitly in README and policy docs; remove it from the "deterministic test sync" recommendations.
  3. Align with ♻️ Move replay semantics from signals to stream-helpers #213 — if replay/late-subscriber semantics move to stream-helpers, is() may need to consume a replay-capable stream rather than raw ValueSignal.

Environment

  • @effectionx/signals: 0.5.3
  • effection: 4.x (v4 structured concurrency / cooperative scheduling)
  • Reproducible in Deno and Node

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions