Skip to content

Commit 6ed450a

Browse files
authored
Merge pull request storybookjs#34979 from storybookjs/jeppe-cursor/docgen-subscription-referential-equality-5a81
Open Service: Fix reactivity on deep signals, fire subscribers on load dependencies
2 parents 9040e16 + 35c1415 commit 6ed450a

15 files changed

Lines changed: 906 additions & 178 deletions

code/.storybook/open-service-debug-service.ts

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,8 @@ function createDebugServiceDef(storyIndexGeneratorPromise: Promise<StoryIndexGen
9797
output: v.undefined(),
9898
handler: async (input, ctx) => {
9999
logger.warn(`[open-service debug] command addActivity(${input.message})`);
100-
ctx.self.setState((draft) => {
101-
draft.activity.push(input.message);
100+
ctx.self.setState((state) => {
101+
state.activity.push(input.message);
102102
});
103103

104104
return undefined;
@@ -115,10 +115,10 @@ function createDebugServiceDef(storyIndexGeneratorPromise: Promise<StoryIndexGen
115115
logger.warn(
116116
`[open-service debug] command syncStoryIndex(${input.reason}) => ${Object.keys(storyIndex.entries).length} entries`
117117
);
118-
ctx.self.setState((draft) => {
119-
draft.storyIndexEntryCount = Object.keys(storyIndex.entries).length;
120-
draft.storyIndexSampleIds = sampleIds;
121-
draft.activity.push(`syncStoryIndex:${input.reason}:${sampleIds.length}`);
118+
ctx.self.setState((state) => {
119+
state.storyIndexEntryCount = Object.keys(storyIndex.entries).length;
120+
state.storyIndexSampleIds = sampleIds;
121+
state.activity.push(`syncStoryIndex:${input.reason}:${sampleIds.length}`);
122122
});
123123

124124
return undefined;
@@ -137,10 +137,10 @@ function createDebugServiceDef(storyIndexGeneratorPromise: Promise<StoryIndexGen
137137
logger.warn(
138138
`[open-service debug] command recordPreloadVisit(${input.entryId}, ${input.source}) => ${value}`
139139
);
140-
ctx.self.setState((draft) => {
141-
draft.preloadedByEntryId[input.entryId] = value;
142-
draft.lastObservedValue = value;
143-
draft.activity.push(`recordPreloadVisit:${input.entryId}:${input.source}`);
140+
ctx.self.setState((state) => {
141+
state.preloadedByEntryId[input.entryId] = value;
142+
state.lastObservedValue = value;
143+
state.activity.push(`recordPreloadVisit:${input.entryId}:${input.source}`);
144144
});
145145

146146
return undefined;

code/core/package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,7 @@
233233
"!src/**/*"
234234
],
235235
"dependencies": {
236+
"@preact/signals-core": "^1.14.2",
236237
"@storybook/global": "^5.0.0",
237238
"@storybook/icons": "^2.0.2",
238239
"@testing-library/dom": "^10.4.1",
@@ -241,6 +242,7 @@
241242
"@vitest/expect": "3.2.4",
242243
"@vitest/spy": "3.2.4",
243244
"@webcontainer/env": "^1.1.1",
245+
"deepsignal": "^1.6.0",
244246
"esbuild": "^0.18.0 || ^0.19.0 || ^0.20.0 || ^0.21.0 || ^0.22.0 || ^0.23.0 || ^0.24.0 || ^0.25.0 || ^0.26.0 || ^0.27.0",
245247
"open": "^10.2.0",
246248
"oxc-parser": "^0.127.0",
@@ -304,7 +306,6 @@
304306
"@yarnpkg/libzip": "2.3.0",
305307
"acorn": "^8.15.0",
306308
"acorn-jsx": "^5.3.2",
307-
"alien-signals": "^3.2.0",
308309
"ansi-to-html": "^0.7.2",
309310
"browser-dtector": "^3.4.0",
310311
"bundle-require": "^5.1.0",

code/core/src/shared/open-service/README.md

Lines changed: 48 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,9 @@ Its goals are:
66

77
- define stateful services in one declarative object
88
- expose synchronous queries and async commands with strong TypeScript inference
9-
- validate all query and command input/output through Standard Schema
10-
- support reactive query subscriptions through `alien-signals`
9+
- validate all query and command input/output through Standard Schema (schemas may transform/coerce)
10+
- support fine-grained reactive query subscriptions through deep signals (`deepsignal` +
11+
`@preact/signals-core`)
1112
- support server-side static state snapshots driven by query `load` hooks
1213

1314
The main audience for this README is agents and maintainers who need to understand how the pieces
@@ -94,6 +95,15 @@ Query handlers do **not** receive `commands` or `setState`. Mutations belong in
9495

9596
`load` mutations must go through commands. Cross-service `getService(...).queries.*` calls inside a load body are not auto-tracked for the drain; use `await ctx.getService(id).queries.foo.loaded(input)` when you need a cross-service dependency awaited before your own load completes.
9697

98+
**`load` is a reactive, idempotent warming step.** For an active subscription, `load` re-fires whenever the external signals it reads synchronously change — same-service fields and cross-service reads via `getService(...).queries.*` alike — turning a query into a reactive async resource (like a TanStack Query / SolidJS `createResource` / Vue async `watchEffect`). This means:
99+
100+
- **`load` must be idempotent.** Re-running it with the same dependencies must produce the same state. Any genuinely one-shot side effect belongs in a command invoked conditionally, never in `load` itself.
101+
- **Read dependencies synchronously, up front.** Only reads in the load's synchronous prefix (before the first `await`) are tracked. Read the values you depend on first, then do async work — the same idiom every signal-based resource uses.
102+
- **Loads that read no external signal fire exactly once** (the common case: `await ctx.self.commands.x(input)`), so existing loads are unaffected.
103+
- Direct `query()` / `.loaded()` calls are **not** reactive — they keep one-shot-per-call semantics. Reactivity is scoped to subscriptions and is torn down when the last subscriber unsubscribes.
104+
105+
The runtime guards re-firing: a superseded run (its dependencies changed again before it finished) cannot overwrite a newer run's state, and changes batched together produce a single re-load.
106+
97107
**Keep `load` bodies as small as possible.** Almost always, `load` should be a one-liner that calls a command — the real work (input resolution, side effects, validation, state mutation) belongs in the command. This pays off for three reasons:
98108

99109
- **Reusability.** Anyone can call the command directly (other services, tests, integrations) without going through the query's load path. Logic stuck inside a load is unreachable from outside the drain.
@@ -158,7 +168,12 @@ Both must be Standard Schema compatible.
158168
The runtime validates:
159169

160170
- caller input before a handler runs
161-
- handler output before the result is returned or emitted
171+
- handler output when a value is produced for a consumer — a direct `query()` call, `query.loaded()`,
172+
the static build, and a subscription emission
173+
174+
Output validation reads the whole value, so it is kept out of the part of a subscription that
175+
determines reactive dependencies: for a `selector` subscriber it runs without tracking, so it cannot
176+
expand the deep-signal dependency footprint. (See "Subscription Flow".)
162177

163178
Queries validate **synchronously**. Their input and output schemas must produce sync results. If a Standard Schema returns a Promise during a query validation, the runtime throws `OpenServiceAsyncSchemaError` immediately.
164179

@@ -250,15 +265,37 @@ When an **async** `load` body runs, it instead gets a *wrapped* `ctx.self.querie
250265

251266
Cross-service `ctx.getService(id).queries.*` calls inside a load body are **not** wrapped; authors must use `.loaded()` explicitly when they need a cross-service dep awaited from inside a load. From a sync handler, cross-service queries are tracked because they consult the module-scoped session like any other call.
252267

268+
## State and reactivity
269+
270+
State is a **deep reactive proxy** (`deepSignal` from `deepsignal`, backed by `@preact/signals-core`)
271+
created in [service-runtime.ts](./service-runtime.ts). There is no top-level state atom and no Immer:
272+
273+
- Reading a field through `ctx.self.state` tracks a fine-grained signal for exactly that field
274+
(including not-yet-present record keys, which fire when the key is later added).
275+
- `setState((state) => …)` mutates the proxy **in place** inside a batch, so one command notifies
276+
subscribers once, and only the fields it actually changed are invalidated.
277+
- The proxy is internal and does not escape:
278+
- Query/`.loaded()` results are the schema-validated value. For object and array schemas that
279+
rebuild a plain value, this also detaches the result from the proxy.
280+
- Subscription emissions are detached to plain values (validated for whole-value subscribers, or
281+
JSON-stripped for `selector` slices).
282+
- The whole-state snapshot for the static build uses `structuredClone` of the plain backing
283+
object. (`structuredClone` cannot clone a proxy, so proxy-slice stripping uses a JSON round-trip;
284+
state must be JSON-serializable, the same constraint the static-build pipeline relies on.)
285+
253286
## Subscription Flow
254287

255-
Subscriptions are implemented with `alien-signals` in [service-runtime.ts](./service-runtime.ts):
288+
Subscriptions are implemented in [service-runtime.ts](./service-runtime.ts):
256289

257-
1. `subscribe(input, callback)` defers all work to a microtask.
258-
2. The microtask validates the input synchronously and fires the dependency's `load` in the background.
259-
3. A `computed()` value wraps the synchronous handler. An `effect()` runs the handler immediately (delivering the current value to the callback) and re-runs whenever the handler's tracked state dependencies change.
260-
4. Subscribers receive the current state right away, then a follow-up emission once the load settles and state changes. UI consumers that want to suppress the pre-load emission should branch on the value (e.g. show a spinner for `null`).
261-
5. Each emitted value is output-validated before the subscriber callback runs.
290+
1. `subscribe(input, callback)` (or `subscribe(input, selector, callback)`) defers all work to a microtask.
291+
2. The microtask validates the input synchronously. If the query has a `load`, it is run inside its own `effect()` so the external signals it reads synchronously are tracked: when they change, the effect re-runs and the load re-fires (see "Load"). Writes from a superseded run are dropped (each run carries an epoch; `setState` is gated on it), so a slow stale load can't clobber a newer result. The effect is torn down with the subscription.
292+
3. A `computed()` runs the synchronous handler against the deep-signal proxy, so its dependency footprint is exactly what it reads. The output is always validated, but where validation runs depends on the subscription:
293+
- **No selector:** the value is validated here and emitted. Reading the whole value to validate it is the correct footprint for a whole-value subscriber, and it keeps the emitted value identical to a direct `query()` pull.
294+
- **With a selector:** validation runs untracked (so it does not register dependencies) and only `selector(value)` is read (then detached to a plain snapshot), so a sibling field the selector ignores never re-runs the handler.
295+
4. An `effect()` runs the computed immediately (delivering the current value) and re-runs only when the computed's tracked fields change. A write to an unrelated key or field never re-runs the handler.
296+
5. Subscribers receive the current state right away, then a follow-up emission once the load settles and state changes. UI consumers that want to suppress the pre-load emission should branch on the value (e.g. show a spinner for `null`).
297+
6. Emissions are deduped by value: the effect compares the new value with the last emitted one via `es-toolkit` `isEqual` and skips the callback when they are equal. So a load that rewrites a deeply-equal value does not re-fire subscribers.
298+
7. The optional `selector` is the `universal-store` pattern: the callback receives the selected slice and fires only when that slice changes by value — and, because the selector drives the computed's reads, an unselected field change does not even re-run the handler.
262299

263300
Tests should use `vi.waitFor(...)` when asserting the first emission or follow-up emissions.
264301

@@ -363,8 +400,8 @@ export const exampleServiceDef = defineService({
363400
input: entryIdSchema,
364401
output: v.void(),
365402
handler: async (input, ctx) => {
366-
ctx.self.setState((draft) => {
367-
draft.values[input.entryId] = 'ready';
403+
ctx.self.setState((state) => {
404+
state.values[input.entryId] = 'ready';
368405
});
369406
},
370407
},

code/core/src/shared/open-service/fixtures.ts

Lines changed: 60 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,9 @@ export const mutableRecordLookupServiceDef = defineService({
4444
input: assignEntryFieldInputSchema,
4545
output: voidOutputSchema,
4646
handler: (input, ctx) => {
47-
ctx.self.setState((draft) => {
48-
draft[input.entryId] ??= {};
49-
draft[input.entryId]![input.fieldKey] = input.fieldValue;
47+
ctx.self.setState((state) => {
48+
state[input.entryId] ??= {};
49+
state[input.entryId]![input.fieldKey] = input.fieldValue;
5050
});
5151
},
5252
},
@@ -82,8 +82,8 @@ export const awaitedPreloadValueServiceDef = defineService({
8282
output: voidOutputSchema,
8383
handler: async (input, ctx) => {
8484
await Promise.resolve();
85-
ctx.self.setState((draft) => {
86-
draft[input.entryId] = 'preloaded';
85+
ctx.self.setState((state) => {
86+
state[input.entryId] = 'preloaded';
8787
});
8888
},
8989
},
@@ -116,14 +116,63 @@ export const fireAndForgetPreloadValueServiceDef = defineService({
116116
output: voidOutputSchema,
117117
handler: async (input, ctx) => {
118118
await Promise.resolve();
119-
ctx.self.setState((draft) => {
120-
draft[input.entryId] = 'preloaded';
119+
ctx.self.setState((state) => {
120+
state[input.entryId] = 'preloaded';
121121
});
122122
},
123123
},
124124
},
125125
});
126126

127+
export type RebuiltValue = { marker: string; count: number };
128+
export type RebuiltValueState = Record<string, RebuiltValue | undefined>;
129+
130+
/** Shared schema for the object value used by the rebuilt-equal-value fixture. */
131+
export const rebuiltValueOutputSchema = v.nullable(
132+
v.object({ marker: v.string(), count: v.number() })
133+
);
134+
135+
/**
136+
* Service fixture whose `load` rebuilds a deeply-equal but freshly-allocated object value every
137+
* time it runs, then writes it back via a command.
138+
*
139+
* It exercises the case where re-subscribing to an already-populated entry would emit a redundant
140+
* second value (the immediate emission plus a load-driven emission carrying an equal-but-not-
141+
* identical object) unless the runtime dedups by value.
142+
*/
143+
export const rebuiltEqualValueOnLoadServiceDef = defineService({
144+
id: 'internal-fixture/rebuilt-equal-value-on-load',
145+
description: 'Rewrites a deeply-equal but freshly-allocated object value on every load.',
146+
initialState: {} as RebuiltValueState,
147+
queries: {
148+
getRebuiltValue: {
149+
description: 'Returns the value for an entry; load always rewrites a fresh-but-equal value.',
150+
input: entryIdInputSchema,
151+
output: rebuiltValueOutputSchema,
152+
handler: (input, ctx) => ctx.self.state[input.entryId] ?? null,
153+
load: async (input, ctx) => {
154+
await ctx.self.commands.rebuildValue(input);
155+
},
156+
},
157+
},
158+
commands: {
159+
rebuildValue: {
160+
description: 'Allocates a brand-new object with a stable value and stores it.',
161+
input: entryIdInputSchema,
162+
output: rebuiltValueOutputSchema,
163+
handler: async (input, ctx) => {
164+
await Promise.resolve();
165+
// A new object literal every call: deeply equal to any prior value, never `===` to it.
166+
const value: RebuiltValue = { marker: 'stable', count: 1 };
167+
ctx.self.setState((state) => {
168+
state[input.entryId] = value;
169+
});
170+
return value;
171+
},
172+
},
173+
},
174+
});
175+
127176
export type SharedStaticFileState = { left?: string; right?: string };
128177

129178
/** Creates a fixture where multiple queries contribute state to one shared static file. */
@@ -162,8 +211,8 @@ export function createSharedStaticFileServiceDef() {
162211
input: noInputSchema,
163212
output: voidOutputSchema,
164213
handler: (_input, ctx) => {
165-
ctx.self.setState((draft) => {
166-
draft.left = 'preloaded';
214+
ctx.self.setState((state) => {
215+
state.left = 'preloaded';
167216
});
168217
},
169218
},
@@ -172,8 +221,8 @@ export function createSharedStaticFileServiceDef() {
172221
input: noInputSchema,
173222
output: voidOutputSchema,
174223
handler: (_input, ctx) => {
175-
ctx.self.setState((draft) => {
176-
draft.right = 'preloaded';
224+
ctx.self.setState((state) => {
225+
state.right = 'preloaded';
177226
});
178227
},
179228
},

code/core/src/shared/open-service/index.test-d.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,9 @@ const openServiceDef = defineService({
6969
output: v.void(),
7070
handler: (input, ctx) => {
7171
expectTypeOf(input).toEqualTypeOf<number>();
72-
ctx.self.setState((draft) => {
73-
expectTypeOf(draft).toEqualTypeOf<OpenServiceState>();
74-
draft.count += input;
72+
ctx.self.setState((state) => {
73+
expectTypeOf(state).toEqualTypeOf<OpenServiceState>();
74+
state.count += input;
7575
});
7676
},
7777
},
@@ -80,9 +80,9 @@ const openServiceDef = defineService({
8080
output: v.void(),
8181
handler: async (input, ctx) => {
8282
expectTypeOf(input).toEqualTypeOf<{ entryId: string }>();
83-
ctx.self.setState((draft) => {
84-
expectTypeOf(draft.valuesById[input.entryId]).toEqualTypeOf<string | undefined>();
85-
draft.valuesById[input.entryId] = 'ready';
83+
ctx.self.setState((state) => {
84+
expectTypeOf(state.valuesById[input.entryId]).toEqualTypeOf<string | undefined>();
85+
state.valuesById[input.entryId] = 'ready';
8686
});
8787
},
8888
},

code/core/src/shared/open-service/server.test-d.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,16 +65,16 @@ const registeredService = registerService(registrationOnlyServiceDef, {
6565
increment: {
6666
handler: (input, ctx) => {
6767
expectTypeOf(input).toEqualTypeOf<number>();
68-
ctx.self.setState((draft) => {
69-
draft.count += input;
68+
ctx.self.setState((state) => {
69+
state.count += input;
7070
});
7171
},
7272
},
7373
preloadValue: {
7474
handler: async (input, ctx) => {
7575
expectTypeOf(input).toEqualTypeOf<{ entryId: string }>();
76-
ctx.self.setState((draft) => {
77-
draft.valuesById[input.entryId] = 'ready';
76+
ctx.self.setState((state) => {
77+
state.valuesById[input.entryId] = 'ready';
7878
});
7979
},
8080
},

code/core/src/shared/open-service/server.test.ts

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -121,8 +121,8 @@ describe('server static builds', () => {
121121
entryId: 'entry-a',
122122
});
123123

124-
ctx.self.setState((draft) => {
125-
draft.value = record?.marker ?? null;
124+
ctx.self.setState((state) => {
125+
state.value = record?.marker ?? null;
126126
});
127127

128128
return undefined;
@@ -167,8 +167,8 @@ describe('server static builds', () => {
167167
output: v.undefined(),
168168
handler: async (_input, ctx) => {
169169
readyEntryIds.splice(0, readyEntryIds.length, 'entry-a');
170-
ctx.self.setState((draft) => {
171-
draft.built = true;
170+
ctx.self.setState((state) => {
171+
state.built = true;
172172
});
173173

174174
return undefined;
@@ -200,8 +200,8 @@ describe('server static builds', () => {
200200
input: v.object({ entryId: v.string() }),
201201
output: v.undefined(),
202202
handler: async (input, ctx) => {
203-
ctx.self.setState((draft) => {
204-
draft.value = input.entryId;
203+
ctx.self.setState((state) => {
204+
state.value = input.entryId;
205205
});
206206

207207
return undefined;
@@ -281,8 +281,8 @@ describe('server static builds', () => {
281281
}),
282282
output: v.undefined(),
283283
handler: async (input, ctx) => {
284-
ctx.self.setState((draft) => {
285-
draft.value = input.value;
284+
ctx.self.setState((state) => {
285+
state.value = input.value;
286286
});
287287

288288
return undefined;
@@ -370,8 +370,8 @@ describe('server static builds', () => {
370370
input: v.undefined(),
371371
output: v.undefined(),
372372
handler: async (_input, ctx) => {
373-
ctx.self.setState((draft) => {
374-
draft.value = 'invalid';
373+
ctx.self.setState((state) => {
374+
state.value = 'invalid';
375375
});
376376

377377
return undefined;
@@ -427,8 +427,8 @@ describe('server static builds', () => {
427427
}),
428428
output: v.undefined(),
429429
handler: async (input, ctx) => {
430-
ctx.self.setState((draft) => {
431-
draft.value = input.value;
430+
ctx.self.setState((state) => {
431+
state.value = input.value;
432432
});
433433

434434
return undefined;

0 commit comments

Comments
 (0)