Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 9 additions & 14 deletions src/core/transport/decode-history.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/**
* decodeHistory — load conversation history from an Ably channel and
* return decoded messages as a PaginatedMessages result.
* return decoded messages as a paginated HistoryPage result.
*
* Uses a fresh decoder (not shared with the live subscription) to avoid
* state conflicts. Per-turn accumulators handle interleaved turns correctly.
Expand All @@ -26,7 +26,7 @@ import { HEADER_AMEND, HEADER_MSG_ID, HEADER_TURN_ID } from '../../constants.js'
import type { Logger } from '../../logger.js';
import { getHeaders } from '../../utils.js';
import type { Codec, DecoderOutput, MessageAccumulator } from '../codec/types.js';
import type { LoadHistoryOptions, PaginatedMessages } from './types.js';
import type { HistoryPage, LoadHistoryOptions } from './types.js';

// ---------------------------------------------------------------------------
// Shared state across pages within one history traversal
Expand Down Expand Up @@ -247,19 +247,16 @@ const fetchUntilLimit = async <TEvent, TMessage>(
};

// ---------------------------------------------------------------------------
// Build PaginatedMessages result from current state
// Build HistoryPage result from current state
// ---------------------------------------------------------------------------

/**
* Build a PaginatedMessages page from the current decode state.
* Build a HistoryPage from the current decode state.
* @param state - The shared history traversal state.
* @param limit - Max messages per page.
* @returns A page of decoded messages with a `next()` cursor.
* @returns A page of decoded history with a `next()` cursor.
*/
const buildResult = <TEvent, TMessage>(
state: HistoryState<TEvent, TMessage>,
limit: number,
): PaginatedMessages<TMessage> => {
const buildResult = <TEvent, TMessage>(state: HistoryState<TEvent, TMessage>, limit: number): HistoryPage<TMessage> => {
// allCompleted is newest-first. Slice from returnedCount for this page,
// then reverse to chronological for display.
const allCompleted = decodeAll(state);
Expand All @@ -277,9 +274,7 @@ const buildResult = <TEvent, TMessage>(
state.returnedRawCount = state.rawMessages.length;

return {
items: chronSlice.map((d) => d.message),
itemHeaders: chronSlice.map((d) => d.headers),
itemSerials: chronSlice.map((d) => d.serial),
items: chronSlice.map((d) => ({ message: d.message, headers: d.headers, serial: d.serial })),
rawMessages: rawSlice,
hasNext: () => moreCompleted || moreAblyPages,
next: async () => {
Expand Down Expand Up @@ -312,15 +307,15 @@ const buildResult = <TEvent, TMessage>(
* @param codec - The codec for decoding wire messages into domain messages.
* @param options - Pagination options.
* @param logger - Logger for diagnostic output.
* @returns The first page of decoded history messages.
* @returns The first page of decoded history.
*/
// Spec: AIT-CT11, AIT-CT11b
export const decodeHistory = async <TEvent, TMessage>(
channel: Ably.RealtimeChannel,
codec: Codec<TEvent, TMessage>,
options: LoadHistoryOptions | undefined,
logger: Logger,
): Promise<PaginatedMessages<TMessage>> => {
): Promise<HistoryPage<TMessage>> => {
const limit = options?.limit ?? 100;
const state: HistoryState<TEvent, TMessage> = {
codec,
Expand Down
26 changes: 16 additions & 10 deletions src/core/transport/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -336,20 +336,26 @@ export interface CloseOptions {
// History / pagination
// ---------------------------------------------------------------------------

/** A page of decoded messages from channel history. */
export interface PaginatedMessages<TMessage> {
/** Decoded messages in chronological order (oldest first). */
items: TMessage[];
/** Headers for each item, parallel to `items`. Used by the transport to populate the tree. */
itemHeaders?: Record<string, string>[];
/** Ably serial for each item, parallel to `items`. Used by the transport for tree ordering. */
itemSerials?: string[];
/** A single decoded history item with its transport metadata. */
export interface HistoryItem<TMessage> {
/** The decoded domain message. */
message: TMessage;
/** Transport headers for tree identity and ordering. */
headers: Record<string, string>;
/** Ably serial for tree ordering. */
serial: string;
}

/** A page of decoded history from the channel. Internal to View/decodeHistory. */
export interface HistoryPage<TMessage> {
/** Decoded items in chronological order (oldest first). */
items: HistoryItem<TMessage>[];
/** Raw Ably messages that produced this page, in chronological order. */
rawMessages?: Ably.InboundMessage[];
rawMessages: Ably.InboundMessage[];
/** Whether there are older pages available. */
hasNext(): boolean;
/** Fetch the next (older) page. Returns undefined if no more pages. */
next(): Promise<PaginatedMessages<TMessage> | undefined>;
next(): Promise<HistoryPage<TMessage> | undefined>;
}

/** Options for loading channel history. */
Expand Down
30 changes: 13 additions & 17 deletions src/core/transport/view.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import type { TreeInternal } from './tree.js';
import type {
ActiveTurn,
EventsNode,
HistoryPage,
MessageNode,
PaginatedMessages,
SendOptions,
TurnLifecycleEvent,
View,
Expand Down Expand Up @@ -137,7 +137,7 @@ export class DefaultView<TEvent, TMessage> implements View<TEvent, TMessage> {
private _hasMoreHistory = false;

/** Internal state for continuing history pagination. */
private _lastHistoryPage: PaginatedMessages<TMessage> | undefined;
private _lastHistoryPage: HistoryPage<TMessage> | undefined;

/** Buffer of withheld nodes, drained newest-first by successive loadOlder() calls. */
private readonly _withheldBuffer: MessageNode<TMessage>[] = [];
Expand Down Expand Up @@ -197,8 +197,8 @@ export class DefaultView<TEvent, TMessage> implements View<TEvent, TMessage> {

async loadOlder(limit = 100): Promise<void> {
if (this._closed || this._loadingOlder) return;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be outside of the scope of this PR, but this looks weird to me: if you call loadOlder twice the second promise returned will resolve immediately even though the first promise is still busy loading older messages. Would it be better to reject the promise in this case?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not really an error though so I don't think reject is a good idea, it's just to stop concurrent calls to loadOlder. When the first one resolves the data will be available.

IMO it's not really a problem, but if we think it is, then I'd say we return a promise that resolves when the inflight call to loadOlder resolves.

this._logger.trace('DefaultView.loadOlder();', { limit });
this._loadingOlder = true;
this._logger.trace('DefaultView.loadOlder();', { limit });

try {
// Drain withheld buffer first (older messages, released newest-first)
Expand Down Expand Up @@ -472,6 +472,7 @@ export class DefaultView<TEvent, TMessage> implements View<TEvent, TMessage> {
close(): void {
this._logger.info('DefaultView.close();');
this._closed = true;
this._loadingOlder = false;
for (const unsub of this._unsubs) unsub();
this._unsubs.length = 0;
this._emitter.off();
Expand Down Expand Up @@ -511,7 +512,7 @@ export class DefaultView<TEvent, TMessage> implements View<TEvent, TMessage> {
this._releaseWithheld(released);
}

private async _loadAndReveal(page: PaginatedMessages<TMessage>, limit: number): Promise<void> {
private async _loadAndReveal(page: HistoryPage<TMessage>, limit: number): Promise<void> {
// Everything currently in the tree is "already known"
const alreadyKnown = new Set(this._tree.flattenNodes(this._resolveSelections()).map((n) => n.msgId));

Expand All @@ -533,33 +534,28 @@ export class DefaultView<TEvent, TMessage> implements View<TEvent, TMessage> {
this._releaseWithheld(batch);
}

private _processHistoryPage(page: PaginatedMessages<TMessage>): void {
private _processHistoryPage(page: HistoryPage<TMessage>): void {
this._processingHistory = true;
try {
for (const [i, message] of page.items.entries()) {
const headers = page.itemHeaders?.[i] ?? {};
const serial = page.itemSerials?.[i];
const msgId = headers[HEADER_MSG_ID];
for (const item of page.items) {
const msgId = item.headers[HEADER_MSG_ID];
if (!msgId) continue;
this._tree.upsert(msgId, message, headers, serial);
this._tree.upsert(msgId, item.message, item.headers, item.serial);
}

// Forward raw Ably messages through the tree
if (page.rawMessages && page.rawMessages.length > 0) {
for (const msg of page.rawMessages) {
this._tree.emitAblyMessage(msg);
}
for (const msg of page.rawMessages) {
this._tree.emitAblyMessage(msg);
}
} finally {
this._processingHistory = false;
}
}

private async _loadUntilVisible(
firstPage: PaginatedMessages<TMessage>,
firstPage: HistoryPage<TMessage>,
target: number,
beforeMsgIds: Set<string>,
): Promise<{ newVisible: MessageNode<TMessage>[]; lastPage: PaginatedMessages<TMessage> }> {
): Promise<{ newVisible: MessageNode<TMessage>[]; lastPage: HistoryPage<TMessage> }> {
this._processHistoryPage(firstPage);
let page = firstPage;

Expand Down
43 changes: 36 additions & 7 deletions test/core/transport/view.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import type { Codec } from '../../../src/core/codec/types.js';
import { decodeHistory } from '../../../src/core/transport/decode-history.js';
import type { DefaultTree } from '../../../src/core/transport/tree.js';
import { createTree } from '../../../src/core/transport/tree.js';
import type { MessageNode, PaginatedMessages, SendOptions, TurnLifecycleEvent } from '../../../src/core/transport/types.js';
import type { HistoryPage, MessageNode, SendOptions, TurnLifecycleEvent } from '../../../src/core/transport/types.js';
import type { SendDelegate } from '../../../src/core/transport/view.js';
import { DefaultView } from '../../../src/core/transport/view.js';
import { LogLevel, makeLogger } from '../../../src/logger.js';
Expand Down Expand Up @@ -64,14 +64,16 @@ const makePage = (
headers: Record<string, string>[],
serials: string[],
hasNextPage = false,
nextPageFn?: () => Promise<PaginatedMessages<TestMessage> | undefined>,
): PaginatedMessages<TestMessage> => ({
items,
itemHeaders: headers,
itemSerials: serials,
nextPageFn?: () => Promise<HistoryPage<TestMessage> | undefined>,
): HistoryPage<TestMessage> => ({
items: items.map((message, i) => ({
message,
headers: headers[i] ?? {},
serial: serials[i] ?? '',
})),
rawMessages: [],
hasNext: () => hasNextPage,
// eslint-disable-next-line @typescript-eslint/promise-function-async, unicorn/no-useless-undefined -- mock needs explicit undefined for PaginatedMessages return type
// eslint-disable-next-line @typescript-eslint/promise-function-async, unicorn/no-useless-undefined -- mock needs explicit undefined for HistoryPage return type
next: nextPageFn ?? (() => Promise.resolve(undefined)),
});

Expand Down Expand Up @@ -464,6 +466,33 @@ describe('DefaultView', () => {
expect(view.flattenNodes()).toHaveLength(5);
});

it('ignores concurrent loadOlder calls', async () => {
let resolveFirst: ((page: HistoryPage<TestMessage>) => void) | undefined;
const firstPromise = new Promise<HistoryPage<TestMessage>>((r) => {
resolveFirst = r;
});
vi.mocked(decodeHistory).mockReturnValue(firstPromise);

const page = makePage(
[{ id: '1', content: 'a' }, { id: '2', content: 'b' }],
[makeHeaders('h1'), makeHeaders('h2')],
['serial-1', 'serial-2'],
);

// Start two concurrent loadOlder calls
const first = view.loadOlder(10);
const second = view.loadOlder(10);

// Resolve the first — the second should have been a no-op
if (resolveFirst) resolveFirst(page);
await first;
await second;

// decodeHistory should only be called once
expect(vi.mocked(decodeHistory)).toHaveBeenCalledOnce();
expect(view.flattenNodes()).toHaveLength(2);
});

it('suppresses ably-message events for withheld nodes', async () => {
const page = makePage(
[{ id: '1', content: 'a' }, { id: '2', content: 'b' }, { id: '3', content: 'c' }],
Expand Down
Loading