Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions docs/internals/conversation-tree.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ Note that serial order is not necessarily delivery order - messages published co
## Data structures

```
_nodeIndex: Map<msgId, InternalNode> Primary index
_sortedList: InternalNode[] All nodes, sorted by serial
_parentIndex: Map<parentId, Set<msgId>> Children of each parent
_selections: Map<groupRootId, index> Selected sibling at each fork
_nodeIndex: Map<msgId, InternalNode> Primary index
_sortedList: InternalNode[] All nodes, sorted by serial
_parentIndex: Map<parentId, Set<msgId>> Children of each parent
_selections: Map<groupRootId, index> Selected sibling at each fork
_structuralVersion: number Monotonic counter (see below)
```

Each `MessageNode` stores:
Expand Down Expand Up @@ -52,6 +53,10 @@ Each `MessageNode` stores:

Serial promotion handles the common case where a client inserts an optimistic message (null serial), then the server publishes it to the channel (with serial). The node moves from the end of the sorted list to its correct serial-order position.

### Structural version

The tree maintains a `structuralVersion` counter (exposed via `TreeInternal`) that increments on changes affecting the `flattenNodes()` output structure - node inserts, deletions, and serial promotions (which reorder the sorted list). Content-only updates (replacing an existing node's message) do not increment the counter. The [View](message-lifecycle.md#cached-message-list) uses this to skip full tree walks during streaming - when only message content changed, the cached node list is still structurally valid.

## Sibling groups and fork chains

When a user calls `regenerate(msgId)` or `edit(msgId)`, the new message carries an [`x-ably-fork-of`](wire-protocol.md#branching-headers) header pointing to `msgId`. Messages that fork the same target (or transitively fork each other) form a **sibling group** - alternative messages at the same point in the conversation.
Expand Down
4 changes: 2 additions & 2 deletions docs/internals/glossary.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ A codec-provided component that assembles [decoder outputs](decoder.md#decoder-o

### Message materialization

The act of producing a flat message list from the [conversation tree](conversation-tree.md) via [`flattenNodes()`](#flatten). `flattenNodes()` returns `MessageNode<TMessage>[]`. Every call rebuilds from scratch - there is no cached list - because the result depends on branch selection state. All consumers go through the view's `flattenNodes()`: React hooks, `send()` (for the HTTP POST body), `view.loadOlder()` (for pagination snapshots). See [Message lifecycle](message-lifecycle.md#why-no-cached-message-list).
The act of producing a flat message list from the [conversation tree](conversation-tree.md) via [`flattenNodes()`](#flatten). `flattenNodes()` returns `MessageNode<TMessage>[]`. The View caches the result and returns it in O(1) on subsequent calls. The cache is refreshed when the tree structure changes (new nodes, deletions, selection changes, history reveal). All consumers go through the view's `flattenNodes()`: React hooks, `send()` (for the HTTP POST body), `view.loadOlder()` (for pagination snapshots). See [Message lifecycle](message-lifecycle.md#cached-message-list).

### Flatten

`view.flattenNodes()` - the sole path from tree state to a message array. (`flattenNodes()` is internal to `TreeInternal`, not on the public `Tree` interface.) Walks the sorted node list, checks parent reachability and sibling selection, and returns the linear message sequence for the currently selected conversation path. See [Conversation tree: flatten](conversation-tree.md#flatten-producing-the-linear-path).
`view.flattenNodes()` - the sole path from tree state to a message array. Returns the View's cached node list in O(1). The cache is rebuilt by an internal `_computeFlatNodes()` method that walks the sorted node list, checks parent reachability and sibling selection, and produces the linear message sequence for the currently selected conversation path. (`flattenNodes()` on `TreeInternal` does the actual tree walk; the View's public method returns cached results.) See [Conversation tree: flatten](conversation-tree.md#flatten-producing-the-linear-path).
20 changes: 16 additions & 4 deletions docs/internals/message-lifecycle.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,23 @@ Every `'update'` event triggers a full `flattenNodes()` call, which rebuilds the

Each turn needs its own accumulator because events from interleaved concurrent turns would corrupt each other's message assembly - a text-delta from turn A would be accumulated into turn B's message.

## Why no cached message list
## Cached message list

The tree is a DAG with branch selection state. The "current conversation" depends on which sibling is selected at each fork point. There is no single cached `TMessage[]` - every call to `flattenNodes()` rebuilds from scratch.
The View caches the result of `flattenNodes()` in a `_cachedNodes` field. The public `flattenNodes()` method returns this cache in O(1). The cache is refreshed by `_computeFlatNodes()` - a private method that performs the actual tree walk - whenever the visible output may have changed:

This is a deliberate tradeoff: no cache invalidation complexity, at the cost of repeated traversals. Since message counts are conversation-sized (tens to low hundreds), this is cheap.
| Trigger | What refreshes the cache |
| ------------------------------------------------------------- | ----------------------------------------------------- |
| Tree structural change (new node, deletion, serial promotion) | `_onTreeUpdate()` calls `_computeFlatNodes()` |
| Content-only update (streaming token) | `_onTreeUpdate()` shallow-copies the cached array |
| Branch selection change | `select()` calls `_computeFlatNodes()` |
| Fork auto-selection after `send()` | `send()` auto-select path calls `_computeFlatNodes()` |
| History page revealed | `_releaseWithheld()` calls `_computeFlatNodes()` |

All consumers go through `view.flattenNodes()`:
### Content-only fast path

The tree exposes a [`structuralVersion`](conversation-tree.md#structural-version) counter that increments on insert, delete, and serial promotion - but not on content-only message updates. When `_onTreeUpdate()` sees the version unchanged, it skips the full tree walk entirely. The cached node list is still structurally valid because only a message reference changed on an existing `MessageNode`. The View compares each cached node's `.message` against the last-seen snapshot to detect which message changed, creates a new array reference (`[...cache]`) so React sees a state change, and emits `'update'`. This reduces the streaming hot path from O(total_nodes) to O(visible_count).

All consumers go through the cached `view.flattenNodes()`:

| Consumer | When it calls `flattenNodes()` |
| --------------------------- | ------------------------------------------------- |
Expand All @@ -133,4 +143,6 @@ All consumers go through `view.flattenNodes()`:
| `send()` / `regenerate()` | To build the HTTP POST body's message history |
| `view.loadOlder()` | To snapshot the current tree state for pagination |

Because all consumers read the cache, a structural tree update triggers one tree walk (inside the View), not one per consumer. Content-only updates (streaming tokens) trigger zero tree walks - only a reference comparison over visible messages. React hooks calling `flattenNodes()` after an `'update'` event get the pre-computed result without a redundant traversal.

See [Conversation tree](conversation-tree.md) for how `flattenNodes()` works. See [Codec interface](codec-interface.md#accumulator) for the accumulator's role. See [History hydration](history.md) for the history decode pipeline.
18 changes: 18 additions & 0 deletions src/core/transport/tree.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ interface InternalNode<TMessage> {

/** Internal tree surface used by View — not part of the public Tree API. */
export interface TreeInternal<TMessage> extends Tree<TMessage> {
/**
* Monotonic counter that increments on structural changes (node insert,
* delete, serial promotion/reorder) but NOT on content-only updates
* (existing node's message replaced). Allows the View to skip full
* tree walks when only message content changed.
*/
readonly structuralVersion: number;

/**
* Flatten the tree along selected branches into a linear node list.
* The `selections` map provides the selected sibling's msgId at each
Expand Down Expand Up @@ -108,6 +116,13 @@ export class DefaultTree<TMessage> implements TreeInternal<TMessage> {
/** Monotonically increasing counter for insertion sequence. */
private _seqCounter = 0;

/** Incremented on structural changes; unchanged on content-only updates. */
private _structuralVersion = 0;

get structuralVersion(): number {
return this._structuralVersion;
}

constructor(logger: Logger) {
this._logger = logger;
this._emitter = new EventEmitter<TreeEventsMap>(logger);
Expand Down Expand Up @@ -394,6 +409,7 @@ export class DefaultTree<TMessage> implements TreeInternal<TMessage> {
// Re-sort: remove from current position, re-insert at correct position.
this._removeSorted(existing);
this._insertSorted(existing);
this._structuralVersion++;
}
this._emitter.emit('update');
return;
Expand All @@ -415,6 +431,7 @@ export class DefaultTree<TMessage> implements TreeInternal<TMessage> {
this._nodeIndex.set(msgId, internal);
this._addToParentIndex(parentId, msgId);
this._insertSorted(internal);
this._structuralVersion++;
this._emitter.emit('update');
}

Expand All @@ -437,6 +454,7 @@ export class DefaultTree<TMessage> implements TreeInternal<TMessage> {

// Children are NOT deleted — they become unreachable in flattenNodes()
// because their parent is no longer on the active path.
this._structuralVersion++;
this._emitter.emit('update');
}

Expand Down
60 changes: 54 additions & 6 deletions src/core/transport/view.ts
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,16 @@ export class DefaultView<TEvent, TMessage> implements View<TEvent, TMessage> {
/** Unsubscribe functions for tree event subscriptions. */
private readonly _unsubs: (() => void)[] = [];

/**
* Cached result of the last flattenNodes computation. Public `flattenNodes()`
* returns this in O(1); internal callers use `_computeFlatNodes()` when a
* fresh tree walk is needed (structural changes, selection changes, history reveal).
*/
private _cachedNodes: MessageNode<TMessage>[] = [];

/** Last seen tree structural version - used to distinguish content-only from structural updates. */
private _lastStructuralVersion = -1;

private _loadingOlder = false;
private _processingHistory = false;
private _closed = false;
Expand All @@ -159,8 +169,10 @@ export class DefaultView<TEvent, TMessage> implements View<TEvent, TMessage> {
this._logger.trace('DefaultView();');
this._emitter = new EventEmitter<ViewEventsMap>(this._logger);

// Snapshot initial visible state
this._updateVisibleSnapshot();
// Compute initial cache and snapshot visible state
this._cachedNodes = this._computeFlatNodes();
this._lastStructuralVersion = this._tree.structuralVersion;
this._updateVisibleSnapshot(this._cachedNodes);

// Subscribe to tree events and re-emit scoped versions
this._unsubs.push(
Expand All @@ -186,6 +198,17 @@ export class DefaultView<TEvent, TMessage> implements View<TEvent, TMessage> {

// Spec: AIT-CT9, AIT-CT11c
flattenNodes(): MessageNode<TMessage>[] {
return this._cachedNodes;
}

/**
* Walk the tree and compute a fresh visible node list, applying branch
* selections and withheld-message filtering. Use this instead of the
* public `flattenNodes()` when the cache may be stale (structural
* changes, selection changes, history reveal).
* @returns A fresh array of visible nodes.
*/
private _computeFlatNodes(): MessageNode<TMessage>[] {
const nodes = this._tree.flattenNodes(this._resolveSelections());
if (this._withheldMsgIds.size === 0) return nodes;
return nodes.filter((n) => !this._withheldMsgIds.has(n.msgId));
Expand Down Expand Up @@ -255,7 +278,8 @@ export class DefaultView<TEvent, TMessage> implements View<TEvent, TMessage> {
if (!selected) return; // unreachable: clamped is always in bounds
this._branchSelections.set(groupRootId, { kind: 'user', selectedId: selected.msgId });
this._logger.debug('DefaultView.select();', { msgId, index: clamped, selectedId: selected.msgId });
this._updateVisibleSnapshot();
this._cachedNodes = this._computeFlatNodes();
this._updateVisibleSnapshot(this._cachedNodes);
this._emitter.emit('update');
}

Expand Down Expand Up @@ -311,7 +335,8 @@ export class DefaultView<TEvent, TMessage> implements View<TEvent, TMessage> {
const lastMsgId = result.optimisticMsgIds.at(-1);
if (lastMsgId) {
this._branchSelections.set(groupRoot, { kind: 'auto', selectedId: lastMsgId });
this._updateVisibleSnapshot();
this._cachedNodes = this._computeFlatNodes();
this._updateVisibleSnapshot(this._cachedNodes);
this._emitter.emit('update');
}
} else {
Expand Down Expand Up @@ -584,7 +609,8 @@ export class DefaultView<TEvent, TMessage> implements View<TEvent, TMessage> {
this._withheldMsgIds.delete(n.msgId);
}
if (nodes.length > 0) {
this._updateVisibleSnapshot();
this._cachedNodes = this._computeFlatNodes();
this._updateVisibleSnapshot(this._cachedNodes);
this._emitter.emit('update');
}
}
Expand Down Expand Up @@ -615,16 +641,38 @@ export class DefaultView<TEvent, TMessage> implements View<TEvent, TMessage> {
// updates arriving during the async history fetch are still forwarded.
if (this._processingHistory) return;

const currentVersion = this._tree.structuralVersion;

// Content-only fast path: the tree structure hasn't changed (no new
// nodes, deletions, or serial reorders), so the cached node list is
// still structurally valid. The tree mutated an existing node's
// .message in place - check if any visible message reference changed.
// JS single-threaded: structuralVersion cannot change between the
// check and the response within this synchronous handler invocation.
if (currentVersion === this._lastStructuralVersion) {
const changed = this._cachedNodes.some((node, i) => node.message !== this._lastVisibleMessages[i]);
if (changed) {
this._lastVisibleMessages = this._cachedNodes.map((n) => n.message);
this._cachedNodes = [...this._cachedNodes];
this._emitter.emit('update');
}
return;
}

// Structural update: full re-walk required.
this._lastStructuralVersion = currentVersion;

// Pin selections for previously-visible nodes that now have siblings.
// This prevents new forks (from other views' edits/regenerates) from
// shifting this view to a branch the user didn't navigate to.
this._pinBranchSelections();
this._resolvePendingSelections();

const nodes = this.flattenNodes();
const nodes = this._computeFlatNodes();
const newIds = nodes.map((n) => n.msgId);
const newMessages = nodes.map((n) => n.message);
if (this._visibleChanged(newIds, newMessages)) {
this._cachedNodes = nodes;
this._updateVisibleSnapshot(nodes);
this._emitter.emit('update');
}
Expand Down
Loading
Loading