diff --git a/docs/internals/conversation-tree.md b/docs/internals/conversation-tree.md index 83c5b36..8a48b89 100644 --- a/docs/internals/conversation-tree.md +++ b/docs/internals/conversation-tree.md @@ -16,10 +16,11 @@ Note that serial order is not necessarily delivery order - messages published co ## Data structures ``` -_nodeIndex: Map Primary index -_sortedList: InternalNode[] All nodes, sorted by serial -_parentIndex: Map> Children of each parent -_selections: Map Selected sibling at each fork +_nodeIndex: Map Primary index +_sortedList: InternalNode[] All nodes, sorted by serial +_parentIndex: Map> Children of each parent +_selections: Map Selected sibling at each fork +_structuralVersion: number Monotonic counter (see below) ``` Each `MessageNode` stores: @@ -52,6 +53,10 @@ Each `MessageNode` stores: Serial promotion handles the common case where a client inserts an optimistic message (null serial), then the server publishes it to the channel (with serial). The node moves from the end of the sorted list to its correct serial-order position. +### Structural version + +The tree maintains a `structuralVersion` counter (exposed via `TreeInternal`) that increments on changes affecting the `flattenNodes()` output structure - node inserts, deletions, and serial promotions (which reorder the sorted list). Content-only updates (replacing an existing node's message) do not increment the counter. The [View](message-lifecycle.md#cached-message-list) uses this to skip full tree walks during streaming - when only message content changed, the cached node list is still structurally valid. + ## Sibling groups and fork chains When a user calls `regenerate(msgId)` or `edit(msgId)`, the new message carries an [`x-ably-fork-of`](wire-protocol.md#branching-headers) header pointing to `msgId`. Messages that fork the same target (or transitively fork each other) form a **sibling group** - alternative messages at the same point in the conversation. diff --git a/docs/internals/glossary.md b/docs/internals/glossary.md index 9aeaa3e..b3840cb 100644 --- a/docs/internals/glossary.md +++ b/docs/internals/glossary.md @@ -118,8 +118,8 @@ A codec-provided component that assembles [decoder outputs](decoder.md#decoder-o ### Message materialization -The act of producing a flat message list from the [conversation tree](conversation-tree.md) via [`flattenNodes()`](#flatten). `flattenNodes()` returns `MessageNode[]`. Every call rebuilds from scratch - there is no cached list - because the result depends on branch selection state. All consumers go through the view's `flattenNodes()`: React hooks, `send()` (for the HTTP POST body), `view.loadOlder()` (for pagination snapshots). See [Message lifecycle](message-lifecycle.md#why-no-cached-message-list). +The act of producing a flat message list from the [conversation tree](conversation-tree.md) via [`flattenNodes()`](#flatten). `flattenNodes()` returns `MessageNode[]`. The View caches the result and returns it in O(1) on subsequent calls. The cache is refreshed when the tree structure changes (new nodes, deletions, selection changes, history reveal). All consumers go through the view's `flattenNodes()`: React hooks, `send()` (for the HTTP POST body), `view.loadOlder()` (for pagination snapshots). See [Message lifecycle](message-lifecycle.md#cached-message-list). ### Flatten -`view.flattenNodes()` - the sole path from tree state to a message array. (`flattenNodes()` is internal to `TreeInternal`, not on the public `Tree` interface.) Walks the sorted node list, checks parent reachability and sibling selection, and returns the linear message sequence for the currently selected conversation path. See [Conversation tree: flatten](conversation-tree.md#flatten-producing-the-linear-path). +`view.flattenNodes()` - the sole path from tree state to a message array. Returns the View's cached node list in O(1). The cache is rebuilt by an internal `_computeFlatNodes()` method that walks the sorted node list, checks parent reachability and sibling selection, and produces the linear message sequence for the currently selected conversation path. (`flattenNodes()` on `TreeInternal` does the actual tree walk; the View's public method returns cached results.) See [Conversation tree: flatten](conversation-tree.md#flatten-producing-the-linear-path). diff --git a/docs/internals/message-lifecycle.md b/docs/internals/message-lifecycle.md index f5d8ab3..9c0bb43 100644 --- a/docs/internals/message-lifecycle.md +++ b/docs/internals/message-lifecycle.md @@ -118,13 +118,23 @@ Every `'update'` event triggers a full `flattenNodes()` call, which rebuilds the Each turn needs its own accumulator because events from interleaved concurrent turns would corrupt each other's message assembly - a text-delta from turn A would be accumulated into turn B's message. -## Why no cached message list +## Cached message list -The tree is a DAG with branch selection state. The "current conversation" depends on which sibling is selected at each fork point. There is no single cached `TMessage[]` - every call to `flattenNodes()` rebuilds from scratch. +The View caches the result of `flattenNodes()` in a `_cachedNodes` field. The public `flattenNodes()` method returns this cache in O(1). The cache is refreshed by `_computeFlatNodes()` - a private method that performs the actual tree walk - whenever the visible output may have changed: -This is a deliberate tradeoff: no cache invalidation complexity, at the cost of repeated traversals. Since message counts are conversation-sized (tens to low hundreds), this is cheap. +| Trigger | What refreshes the cache | +| ------------------------------------------------------------- | ----------------------------------------------------- | +| Tree structural change (new node, deletion, serial promotion) | `_onTreeUpdate()` calls `_computeFlatNodes()` | +| Content-only update (streaming token) | `_onTreeUpdate()` shallow-copies the cached array | +| Branch selection change | `select()` calls `_computeFlatNodes()` | +| Fork auto-selection after `send()` | `send()` auto-select path calls `_computeFlatNodes()` | +| History page revealed | `_releaseWithheld()` calls `_computeFlatNodes()` | -All consumers go through `view.flattenNodes()`: +### Content-only fast path + +The tree exposes a [`structuralVersion`](conversation-tree.md#structural-version) counter that increments on insert, delete, and serial promotion - but not on content-only message updates. When `_onTreeUpdate()` sees the version unchanged, it skips the full tree walk entirely. The cached node list is still structurally valid because only a message reference changed on an existing `MessageNode`. The View compares each cached node's `.message` against the last-seen snapshot to detect which message changed, creates a new array reference (`[...cache]`) so React sees a state change, and emits `'update'`. This reduces the streaming hot path from O(total_nodes) to O(visible_count). + +All consumers go through the cached `view.flattenNodes()`: | Consumer | When it calls `flattenNodes()` | | --------------------------- | ------------------------------------------------- | @@ -133,4 +143,6 @@ All consumers go through `view.flattenNodes()`: | `send()` / `regenerate()` | To build the HTTP POST body's message history | | `view.loadOlder()` | To snapshot the current tree state for pagination | +Because all consumers read the cache, a structural tree update triggers one tree walk (inside the View), not one per consumer. Content-only updates (streaming tokens) trigger zero tree walks - only a reference comparison over visible messages. React hooks calling `flattenNodes()` after an `'update'` event get the pre-computed result without a redundant traversal. + See [Conversation tree](conversation-tree.md) for how `flattenNodes()` works. See [Codec interface](codec-interface.md#accumulator) for the accumulator's role. See [History hydration](history.md) for the history decode pipeline. diff --git a/src/core/transport/tree.ts b/src/core/transport/tree.ts index 2bd77cf..8d60ca3 100644 --- a/src/core/transport/tree.ts +++ b/src/core/transport/tree.ts @@ -39,6 +39,14 @@ interface InternalNode { /** Internal tree surface used by View — not part of the public Tree API. */ export interface TreeInternal extends Tree { + /** + * Monotonic counter that increments on structural changes (node insert, + * delete, serial promotion/reorder) but NOT on content-only updates + * (existing node's message replaced). Allows the View to skip full + * tree walks when only message content changed. + */ + readonly structuralVersion: number; + /** * Flatten the tree along selected branches into a linear node list. * The `selections` map provides the selected sibling's msgId at each @@ -108,6 +116,13 @@ export class DefaultTree implements TreeInternal { /** Monotonically increasing counter for insertion sequence. */ private _seqCounter = 0; + /** Incremented on structural changes; unchanged on content-only updates. */ + private _structuralVersion = 0; + + get structuralVersion(): number { + return this._structuralVersion; + } + constructor(logger: Logger) { this._logger = logger; this._emitter = new EventEmitter(logger); @@ -394,6 +409,7 @@ export class DefaultTree implements TreeInternal { // Re-sort: remove from current position, re-insert at correct position. this._removeSorted(existing); this._insertSorted(existing); + this._structuralVersion++; } this._emitter.emit('update'); return; @@ -415,6 +431,7 @@ export class DefaultTree implements TreeInternal { this._nodeIndex.set(msgId, internal); this._addToParentIndex(parentId, msgId); this._insertSorted(internal); + this._structuralVersion++; this._emitter.emit('update'); } @@ -437,6 +454,7 @@ export class DefaultTree implements TreeInternal { // Children are NOT deleted — they become unreachable in flattenNodes() // because their parent is no longer on the active path. + this._structuralVersion++; this._emitter.emit('update'); } diff --git a/src/core/transport/view.ts b/src/core/transport/view.ts index 6c13b62..0474ac5 100644 --- a/src/core/transport/view.ts +++ b/src/core/transport/view.ts @@ -145,6 +145,16 @@ export class DefaultView implements View { /** Unsubscribe functions for tree event subscriptions. */ private readonly _unsubs: (() => void)[] = []; + /** + * Cached result of the last flattenNodes computation. Public `flattenNodes()` + * returns this in O(1); internal callers use `_computeFlatNodes()` when a + * fresh tree walk is needed (structural changes, selection changes, history reveal). + */ + private _cachedNodes: MessageNode[] = []; + + /** Last seen tree structural version - used to distinguish content-only from structural updates. */ + private _lastStructuralVersion = -1; + private _loadingOlder = false; private _processingHistory = false; private _closed = false; @@ -159,8 +169,10 @@ export class DefaultView implements View { this._logger.trace('DefaultView();'); this._emitter = new EventEmitter(this._logger); - // Snapshot initial visible state - this._updateVisibleSnapshot(); + // Compute initial cache and snapshot visible state + this._cachedNodes = this._computeFlatNodes(); + this._lastStructuralVersion = this._tree.structuralVersion; + this._updateVisibleSnapshot(this._cachedNodes); // Subscribe to tree events and re-emit scoped versions this._unsubs.push( @@ -186,6 +198,17 @@ export class DefaultView implements View { // Spec: AIT-CT9, AIT-CT11c flattenNodes(): MessageNode[] { + return this._cachedNodes; + } + + /** + * Walk the tree and compute a fresh visible node list, applying branch + * selections and withheld-message filtering. Use this instead of the + * public `flattenNodes()` when the cache may be stale (structural + * changes, selection changes, history reveal). + * @returns A fresh array of visible nodes. + */ + private _computeFlatNodes(): MessageNode[] { const nodes = this._tree.flattenNodes(this._resolveSelections()); if (this._withheldMsgIds.size === 0) return nodes; return nodes.filter((n) => !this._withheldMsgIds.has(n.msgId)); @@ -255,7 +278,8 @@ export class DefaultView implements View { if (!selected) return; // unreachable: clamped is always in bounds this._branchSelections.set(groupRootId, { kind: 'user', selectedId: selected.msgId }); this._logger.debug('DefaultView.select();', { msgId, index: clamped, selectedId: selected.msgId }); - this._updateVisibleSnapshot(); + this._cachedNodes = this._computeFlatNodes(); + this._updateVisibleSnapshot(this._cachedNodes); this._emitter.emit('update'); } @@ -311,7 +335,8 @@ export class DefaultView implements View { const lastMsgId = result.optimisticMsgIds.at(-1); if (lastMsgId) { this._branchSelections.set(groupRoot, { kind: 'auto', selectedId: lastMsgId }); - this._updateVisibleSnapshot(); + this._cachedNodes = this._computeFlatNodes(); + this._updateVisibleSnapshot(this._cachedNodes); this._emitter.emit('update'); } } else { @@ -584,7 +609,8 @@ export class DefaultView implements View { this._withheldMsgIds.delete(n.msgId); } if (nodes.length > 0) { - this._updateVisibleSnapshot(); + this._cachedNodes = this._computeFlatNodes(); + this._updateVisibleSnapshot(this._cachedNodes); this._emitter.emit('update'); } } @@ -615,16 +641,38 @@ export class DefaultView implements View { // updates arriving during the async history fetch are still forwarded. if (this._processingHistory) return; + const currentVersion = this._tree.structuralVersion; + + // Content-only fast path: the tree structure hasn't changed (no new + // nodes, deletions, or serial reorders), so the cached node list is + // still structurally valid. The tree mutated an existing node's + // .message in place - check if any visible message reference changed. + // JS single-threaded: structuralVersion cannot change between the + // check and the response within this synchronous handler invocation. + if (currentVersion === this._lastStructuralVersion) { + const changed = this._cachedNodes.some((node, i) => node.message !== this._lastVisibleMessages[i]); + if (changed) { + this._lastVisibleMessages = this._cachedNodes.map((n) => n.message); + this._cachedNodes = [...this._cachedNodes]; + this._emitter.emit('update'); + } + return; + } + + // Structural update: full re-walk required. + this._lastStructuralVersion = currentVersion; + // Pin selections for previously-visible nodes that now have siblings. // This prevents new forks (from other views' edits/regenerates) from // shifting this view to a branch the user didn't navigate to. this._pinBranchSelections(); this._resolvePendingSelections(); - const nodes = this.flattenNodes(); + const nodes = this._computeFlatNodes(); const newIds = nodes.map((n) => n.msgId); const newMessages = nodes.map((n) => n.message); if (this._visibleChanged(newIds, newMessages)) { + this._cachedNodes = nodes; this._updateVisibleSnapshot(nodes); this._emitter.emit('update'); } diff --git a/test/core/transport/view.test.ts b/test/core/transport/view.test.ts index ef6275c..647f782 100644 --- a/test/core/transport/view.test.ts +++ b/test/core/transport/view.test.ts @@ -740,7 +740,6 @@ describe('DefaultView', () => { channel: createMockChannel(), codec: createMockCodec(), sendDelegate: createMockSendDelegate(), - logger: silentLogger, }); @@ -767,7 +766,6 @@ describe('DefaultView', () => { channel: createMockChannel(), codec: createMockCodec(), sendDelegate: createMockSendDelegate(), - logger: silentLogger, }); @@ -801,7 +799,6 @@ describe('DefaultView', () => { channel: createMockChannel(), codec: createMockCodec(), sendDelegate: createMockSendDelegate(), - logger: silentLogger, }); @@ -861,7 +858,6 @@ describe('DefaultView', () => { channel: createMockChannel(), codec: createMockCodec(), sendDelegate: forkDelegate, - logger: silentLogger, }); @@ -897,7 +893,6 @@ describe('DefaultView', () => { channel: createMockChannel(), codec: createMockCodec(), sendDelegate: noopDelegate, - logger: silentLogger, }); @@ -972,7 +967,6 @@ describe('DefaultView', () => { channel: createMockChannel(), codec: createMockCodec(), sendDelegate: noopDelegate, - logger: silentLogger, }); @@ -1041,7 +1035,6 @@ describe('DefaultView', () => { channel: createMockChannel(), codec: createMockCodec(), sendDelegate: noopDelegate, - logger: silentLogger, }); @@ -1157,7 +1150,6 @@ describe('DefaultView', () => { channel: createMockChannel(), codec: createMockCodec(), sendDelegate: createMockSendDelegate(), - logger: silentLogger, }); @@ -1185,7 +1177,6 @@ describe('DefaultView', () => { channel: createMockChannel(), codec: createMockCodec(), sendDelegate: mockDelegate, - logger: silentLogger, }); // Seed a linear chain: m1 -> m2 -> m3 @@ -1387,4 +1378,103 @@ describe('DefaultView', () => { await expect(view.edit('m1', { id: '2', content: 'revised' })).rejects.toThrow('view is closed'); }); }); + + // ------------------------------------------------------------------------- + // flattenNodes caching and reference stability + // ------------------------------------------------------------------------- + + describe('flattenNodes caching and reference stability', () => { + it('returns the same array reference on consecutive calls without intervening changes', () => { + tree.upsert('m1', { id: '1', content: 'hi' }, makeHeaders('m1'), 'serial-1'); + + const ref1 = view.flattenNodes(); + const ref2 = view.flattenNodes(); + + // flattenNodes() should return a cached result - the same array + // reference - when nothing has changed between calls. + expect(ref2).toBe(ref1); + }); + + it('does not re-walk the tree during a content-only message update', () => { + tree.upsert('m1', { id: '1', content: 'first' }, makeHeaders('m1'), 'serial-1'); + tree.upsert('m2', { id: '2', content: 'second' }, makeHeaders('m2', 'turn-1'), 'serial-2'); + + // Capture the current cached state so the view has a baseline + view.flattenNodes(); + + const spy = vi.spyOn(tree, 'flattenNodes'); + spy.mockClear(); + + // Content-only update: same msgId, different message content, no serial change + tree.upsert('m2', { id: '2', content: 'streaming token' }, makeHeaders('m2', 'turn-1'), 'serial-2'); + + // The view should detect this is a content-only update and skip the + // full tree walk - using the cached node list instead. + expect(spy).not.toHaveBeenCalled(); + + spy.mockRestore(); + }); + + it('preserves unchanged message references after a content-only update', () => { + const msg1 = { id: '1', content: 'stable' }; + const msg2 = { id: '2', content: 'will-change' }; + tree.upsert('m1', msg1, makeHeaders('m1'), 'serial-1'); + tree.upsert('m2', msg2, makeHeaders('m2'), 'serial-2'); + + const before = view.flattenNodes(); + const msg1RefBefore = before[0]?.message; + + // Content-only update to m2 only + tree.upsert('m2', { id: '2', content: 'changed' }, makeHeaders('m2'), 'serial-2'); + + const after = view.flattenNodes(); + + // m1's message reference should be preserved (identical object) + expect(after[0]?.message).toBe(msg1RefBefore); + // m2's message reference should differ (content changed) + expect(after[1]?.message).not.toBe(msg2); + expect(after[1]?.message).toEqual({ id: '2', content: 'changed' }); + }); + + it('returns a new array reference after a content-only update so React detects the change', () => { + tree.upsert('m1', { id: '1', content: 'hi' }, makeHeaders('m1'), 'serial-1'); + + const before = view.flattenNodes(); + + tree.upsert('m1', { id: '1', content: 'updated' }, makeHeaders('m1'), 'serial-1'); + + const after = view.flattenNodes(); + // The array itself must be a new reference (so React state updates trigger), + // even though the tree structure hasn't changed. + expect(after).not.toBe(before); + }); + + it('simulated streaming: only the active message reference changes per token', () => { + // Set up a conversation with 3 messages, then simulate token-by-token + // streaming updates to the last message (m3). Only m3's message + // reference should change; m1 and m2 should remain stable. + const msg1 = { id: '1', content: 'user msg' }; + const msg2 = { id: '2', content: 'assistant msg' }; + tree.upsert('m1', msg1, makeHeaders('m1'), 'serial-1'); + tree.upsert('m2', msg2, { [HEADER_MSG_ID]: 'm2', 'x-ably-parent': 'm1' }, 'serial-2'); + tree.upsert('m3', { id: '3', content: '' }, { [HEADER_MSG_ID]: 'm3', 'x-ably-parent': 'm2' }, 'serial-3'); + + const snap0 = view.flattenNodes(); + const m1Ref0 = snap0[0]?.message; + const m2Ref0 = snap0[1]?.message; + + // Simulate 3 streaming tokens updating m3 + const tokens = ['Hello', 'Hello world', 'Hello world!']; + for (const token of tokens) { + tree.upsert('m3', { id: '3', content: token }, {}, 'serial-3'); + + const snap = view.flattenNodes(); + // m1 and m2 references must remain the same object + expect(snap[0]?.message).toBe(m1Ref0); + expect(snap[1]?.message).toBe(m2Ref0); + // m3 content must reflect the latest token + expect(snap[2]?.message.content).toBe(token); + } + }); + }); }); diff --git a/test/react/use-view.test.ts b/test/react/use-view.test.ts index 89a2910..cd1a16b 100644 --- a/test/react/use-view.test.ts +++ b/test/react/use-view.test.ts @@ -180,4 +180,36 @@ describe('useView', () => { expect(result.current.nodes).toHaveLength(1); expect(result.current.messages).toEqual(['hello']); }); + + // --------------------------------------------------------------------------- + // Reference stability during streaming + // --------------------------------------------------------------------------- + + it('preserves message references for unchanged messages during streaming update', () => { + const msg1 = 'stable-message'; + const msg2 = 'streaming-message'; + const mock = createMockTransport([msg1, msg2]); + const { result } = renderHook(() => useView(mock.transport)); + + // Verify initial messages + expect(result.current.messages[0]).toBe(msg1); + expect(result.current.messages[1]).toBe(msg2); + + // Simulate streaming update: msg2 changes, msg1 stays (same reference) + const msg2Updated = 'streaming-message-updated'; + const updatedNodes = [ + { message: msg1, msgId: 'msg-0', parentId: undefined, forkOf: undefined, headers: {}, serial: undefined }, + { message: msg2Updated, msgId: 'msg-1', parentId: undefined, forkOf: undefined, headers: {}, serial: undefined }, + ]; + (mock.view.flattenNodes as ReturnType).mockReturnValue(updatedNodes); + + act(() => { + mock.emitTree('update'); + }); + + // msg1's reference should be preserved - same string object + expect(result.current.messages[0]).toBe(msg1); + // msg2's reference should be the new value + expect(result.current.messages[1]).toBe(msg2Updated); + }); }); diff --git a/test/vercel/react/use-message-sync.test.ts b/test/vercel/react/use-message-sync.test.ts index d3402bf..dc6e297 100644 --- a/test/vercel/react/use-message-sync.test.ts +++ b/test/vercel/react/use-message-sync.test.ts @@ -22,6 +22,15 @@ const makeMessage = (id: string, role: AI.UIMessage['role'] = 'user'): AI.UIMess parts: [], }); +const makeNode = (m: AI.UIMessage) => ({ + message: m, + msgId: m.id, + parentId: undefined, + forkOf: undefined, + headers: {}, + serial: undefined, +}); + const createMockTransport = (): MockTransport => { const viewHandlers = new Map>(); @@ -300,4 +309,46 @@ describe('useMessageSync', () => { expect(gateOpenUpdater([])).toEqual([userMsg, observerMsg]); }); }); + + // --------------------------------------------------------------------------- + // Reference stability during streaming + // --------------------------------------------------------------------------- + + it('preserves unchanged message references across streaming updates', () => { + const mock = createMockTransport(); + const msg1 = makeMessage('1'); + const msg2 = makeMessage('2', 'assistant'); + + mock.viewFlattenNodes.mockReturnValue([makeNode(msg1), makeNode(msg2)]); + + const setMessages = vi.fn(); + renderHook(() => { + useMessageSync(mock.transport, setMessages); + }); + + // First update - populates messages + act(() => { + mock.emitView('update'); + }); + + // msg2 gets updated content (new reference), msg1 stays same + const msg2Updated = makeMessage('2', 'assistant'); + msg2Updated.parts = [{ type: 'text', text: 'Hello' }]; + mock.viewFlattenNodes.mockReturnValue([makeNode(msg1), makeNode(msg2Updated)]); + + // Second update - streaming token + act(() => { + mock.emitView('update'); + }); + + // Extract the messages produced by the second update's updater + // CAST: setMessages receives an updater function from useMessageSync + const updater = setMessages.mock.calls[1]?.[0] as (prev: AI.UIMessage[]) => AI.UIMessage[]; + const result = updater([]); + + // msg1 should be the exact same reference (not cloned) + expect(result[0]).toBe(msg1); + // msg2 should be the new reference + expect(result[1]).toBe(msg2Updated); + }); });