Skip to content

Commit 6ad630f

Browse files
mschristensenclaude
andcommitted
transport: fix multi-view selection stability and streaming visibility
Addresses several issues found during design review: - Add selection pinning: when a new fork appears, views that were already showing the original message pin their selection to it, preventing unintended branch shifts from other views' edits. Views that initiate a fork auto-select the new sibling via send(). - Fix streaming visibility: _visibleChanged now compares message references alongside msgIds, so in-place content updates (streaming accumulation) trigger view 'update' events for observers. - Fix ably-message filtering: _onTreeAblyMessage now checks that the msgId is in the visible set, not just not-withheld. Messages on non-selected branches are no longer forwarded. - Validate messageId in regenerate/edit: throw Ably.ErrorInfo when the target node doesn't exist in the tree. - Add getSiblingIndex() to TreeInternal for selection pinning. - Add constructor trace log, warn on _getHistoryBefore fallback, comment on ChatTransport's default-view assumption. Tests cover pinning stability, fork auto-select, streaming emit, off-branch ably-message suppression, unknown messageId validation, and loadOlder on a closed view. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 9a853a1 commit 6ad630f

4 files changed

Lines changed: 244 additions & 31 deletions

File tree

src/core/transport/tree.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,12 @@ export interface TreeInternal<TMessage> extends Tree<TMessage> {
5353
*/
5454
getGroupRoot(msgId: string): string;
5555

56+
/**
57+
* Get the index of a specific msgId within its sibling group.
58+
* Returns -1 if the msgId is not found.
59+
*/
60+
getSiblingIndex(msgId: string): number;
61+
5662
/** Forward a raw Ably message event to tree subscribers. */
5763
emitAblyMessage(msg: Ably.InboundMessage): void;
5864
/** Forward a turn lifecycle event to tree subscribers. */
@@ -344,6 +350,10 @@ export class DefaultTree<TMessage> implements TreeInternal<TMessage> {
344350
return this._getSiblingGroup(msgId).length > 1;
345351
}
346352

353+
getSiblingIndex(msgId: string): number {
354+
return this._getSiblingGroup(msgId).findIndex((n) => n.msgId === msgId);
355+
}
356+
347357
getNode(msgId: string): TreeNode<TMessage> | undefined {
348358
this._logger.trace('DefaultTree.getNode();', { msgId });
349359
return this._nodeIndex.get(msgId)?.node;

src/core/transport/view.ts

Lines changed: 87 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,10 @@
1313
* visible nodes, and 'turn' only for turns with visible messages.
1414
*/
1515

16-
import type * as Ably from 'ably';
16+
import * as Ably from 'ably';
1717

1818
import { HEADER_MSG_ID, HEADER_TURN_ID } from '../../constants.js';
19+
import { ErrorCode } from '../../errors.js';
1920
import { EventEmitter } from '../../event-emitter.js';
2021
import type { Logger } from '../../logger.js';
2122
import { getHeaders } from '../../utils.js';
@@ -96,9 +97,12 @@ export class DefaultView<TEvent, TMessage> implements View<TEvent, TMessage> {
9697
/** Msg-ids loaded from history but not yet revealed to the UI. */
9798
private readonly _withheldMsgIds = new Set<string>();
9899

99-
/** Snapshot of visible msgIds — used to detect whether tree updates affect the view. */
100+
/** Snapshot of visible msgIds — used to detect structural changes and for selection pinning. */
100101
private _lastVisibleIds: string[] = [];
101102

103+
/** Snapshot of visible message references — used to detect in-place content updates (streaming). */
104+
private _lastVisibleMessages: TMessage[] = [];
105+
102106
/** Whether there are more history pages to fetch from the channel. */
103107
private _hasMoreHistory = false;
104108

@@ -119,10 +123,11 @@ export class DefaultView<TEvent, TMessage> implements View<TEvent, TMessage> {
119123
this._codec = options.codec;
120124
this._sendDelegate = options.sendDelegate;
121125
this._logger = options.logger.withContext({ component: 'View' });
126+
this._logger.trace('DefaultView();');
122127
this._emitter = new EventEmitter<ViewEventsMap>(this._logger);
123128

124129
// Snapshot initial visible state
125-
this._lastVisibleIds = this._computeVisibleIds();
130+
this._updateVisibleSnapshot();
126131

127132
// Subscribe to tree events and re-emit scoped versions
128133
this._unsubs.push(
@@ -242,15 +247,37 @@ export class DefaultView<TEvent, TMessage> implements View<TEvent, TMessage> {
242247
// Spec: AIT-CT3, AIT-CT4
243248
async send(input: TMessage | TMessage[], options?: SendOptions): Promise<ActiveTurn<TEvent>> {
244249
this._logger.trace('DefaultView.send();');
245-
return this._sendDelegate(input, options, { flattenNodes: () => this.flattenNodes() });
250+
const result = await this._sendDelegate(input, options, { flattenNodes: () => this.flattenNodes() });
251+
252+
// Auto-select the new fork in this view when creating a fork.
253+
// The delegate's optimistic insert created the sibling — pin this
254+
// view to the latest (new) sibling so the user sees their edit/regeneration.
255+
if (options?.forkOf) {
256+
const groupRoot = this._tree.getGroupRoot(options.forkOf);
257+
const siblings = this._tree.getSiblings(options.forkOf);
258+
if (siblings.length > 1) {
259+
this._selections.set(groupRoot, siblings.length - 1);
260+
this._updateVisibleSnapshot();
261+
this._emitter.emit('update');
262+
}
263+
}
264+
265+
return result;
246266
}
247267

248268
// Spec: AIT-CT5
249269
async regenerate(messageId: string, options?: SendOptions): Promise<ActiveTurn<TEvent>> {
250270
this._logger.trace('DefaultView.regenerate();', { messageId });
251271

252272
const node = this._tree.getNode(messageId);
253-
const parentId = node?.parentId;
273+
if (!node) {
274+
throw new Ably.ErrorInfo(
275+
`unable to regenerate; message not found in tree: ${messageId}`,
276+
ErrorCode.InvalidArgument,
277+
400,
278+
);
279+
}
280+
const parentId = node.parentId;
254281

255282
return this.send([], {
256283
...options,
@@ -272,7 +299,14 @@ export class DefaultView<TEvent, TMessage> implements View<TEvent, TMessage> {
272299
this._logger.trace('DefaultView.edit();', { messageId });
273300

274301
const node = this._tree.getNode(messageId);
275-
const parentId = node?.parentId;
302+
if (!node) {
303+
throw new Ably.ErrorInfo(
304+
`unable to edit; message not found in tree: ${messageId}`,
305+
ErrorCode.InvalidArgument,
306+
400,
307+
);
308+
}
309+
const parentId = node.parentId;
276310

277311
return this.send(newMessages, {
278312
...options,
@@ -289,7 +323,13 @@ export class DefaultView<TEvent, TMessage> implements View<TEvent, TMessage> {
289323
this._logger.trace('DefaultView._getHistoryBefore();', { messageId });
290324
const all = this.flattenNodes();
291325
const idx = all.findIndex((n) => n.msgId === messageId);
292-
return idx === -1 ? all : all.slice(0, idx);
326+
if (idx === -1) {
327+
this._logger.warn('DefaultView._getHistoryBefore(); target not in visible nodes, returning full list', {
328+
messageId,
329+
});
330+
return all;
331+
}
332+
return all.slice(0, idx);
293333
}
294334

295335
// -------------------------------------------------------------------------
@@ -456,22 +496,50 @@ export class DefaultView<TEvent, TMessage> implements View<TEvent, TMessage> {
456496
// Private: scoped event forwarding
457497
// -------------------------------------------------------------------------
458498

459-
private _computeVisibleIds(): string[] {
460-
return this.flattenNodes().map((n) => n.msgId);
499+
private _computeVisibleNodes(): TreeNode<TMessage>[] {
500+
return this.flattenNodes();
461501
}
462502

463503
private _updateVisibleSnapshot(): void {
464-
this._lastVisibleIds = this._computeVisibleIds();
504+
const nodes = this._computeVisibleNodes();
505+
this._lastVisibleIds = nodes.map((n) => n.msgId);
506+
this._lastVisibleMessages = nodes.map((n) => n.message);
465507
}
466508

467509
private _onTreeUpdate(): void {
468-
const newIds = this._computeVisibleIds();
469-
if (this._visibleChanged(newIds)) {
510+
// Pin selections for previously-visible nodes that now have siblings.
511+
// This prevents new forks (from other views' edits/regenerates) from
512+
// shifting this view to a branch the user didn't navigate to.
513+
this._pinVisibleSelections();
514+
515+
const nodes = this._computeVisibleNodes();
516+
const newIds = nodes.map((n) => n.msgId);
517+
const newMessages = nodes.map((n) => n.message);
518+
if (this._visibleChanged(newIds, newMessages)) {
470519
this._lastVisibleIds = newIds;
520+
this._lastVisibleMessages = newMessages;
471521
this._emitter.emit('update');
472522
}
473523
}
474524

525+
/**
526+
* For each previously-visible message that now has siblings but no
527+
* explicit selection, pin the selection to that message's index.
528+
* This preserves the current branch when new forks appear from
529+
* other views or external sources.
530+
*/
531+
private _pinVisibleSelections(): void {
532+
for (const msgId of this._lastVisibleIds) {
533+
if (!this._tree.hasSiblings(msgId)) continue;
534+
const groupRoot = this._tree.getGroupRoot(msgId);
535+
if (this._selections.has(groupRoot)) continue;
536+
const idx = this._tree.getSiblingIndex(msgId);
537+
if (idx >= 0) {
538+
this._selections.set(groupRoot, idx);
539+
}
540+
}
541+
}
542+
475543
private _onTreeAblyMessage(msg: Ably.InboundMessage): void {
476544
// Re-emit only if the message corresponds to a visible node
477545
const headers = getHeaders(msg);
@@ -481,7 +549,8 @@ export class DefaultView<TEvent, TMessage> implements View<TEvent, TMessage> {
481549
this._emitter.emit('ably-message', msg);
482550
return;
483551
}
484-
if (!this._withheldMsgIds.has(msgId)) {
552+
// Check that msgId is on the visible branch and not withheld
553+
if (this._lastVisibleIds.includes(msgId)) {
485554
this._emitter.emit('ably-message', msg);
486555
}
487556
}
@@ -498,11 +567,15 @@ export class DefaultView<TEvent, TMessage> implements View<TEvent, TMessage> {
498567
}
499568
}
500569

501-
private _visibleChanged(newIds: string[]): boolean {
570+
private _visibleChanged(newIds: string[], newMessages: TMessage[]): boolean {
502571
if (newIds.length !== this._lastVisibleIds.length) return true;
503572
for (const [i, newId] of newIds.entries()) {
504573
if (newId !== this._lastVisibleIds[i]) return true;
505574
}
575+
// Also detect in-place content updates (e.g. streaming) via reference comparison
576+
for (const [i, msg] of newMessages.entries()) {
577+
if (msg !== this._lastVisibleMessages[i]) return true;
578+
}
506579
return false;
507580
}
508581
}

src/vercel/transport/chat-transport.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,7 @@ export const createChatTransport = (
182182
// Look up the parent of the message being regenerated.
183183
// messageId comes from useChat (UIMessage.id) — scan the flattened
184184
// nodes to find the one whose domain message matches this ID.
185+
// Uses the transport's default view — ChatTransport is single-view (one useChat per channel).
185186
const node = transport.view.flattenNodes().find((n) => n.message.id === messageId);
186187
if (node) {
187188
// Use the tree node's msgId (x-ably-msg-id) as forkOf — this is

0 commit comments

Comments
 (0)