Skip to content

Commit eed5e51

Browse files
committed
Replace PaginatedMessages with HistoryPage
The PaginatedMessages interface used parallel arrays (items, itemHeaders, itemSerials, rawMessages) matched by index — fragile and wider than its sole internal consumer (View) needs. Since the type is no longer part of the public API, consolidate into HistoryItem (message + headers + serial per item) and HistoryPage. Also guard View.loadOlder() against concurrent calls — a second call while the first is still awaiting now silently no-ops instead of corrupting shared pagination state. AIT-670, AIT-671
1 parent 6ef8a77 commit eed5e51

File tree

4 files changed

+74
-48
lines changed

4 files changed

+74
-48
lines changed

src/core/transport/decode-history.ts

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
/**
22
* decodeHistory — load conversation history from an Ably channel and
3-
* return decoded messages as a PaginatedMessages result.
3+
* return decoded messages as a paginated HistoryPage result.
44
*
55
* Uses a fresh decoder (not shared with the live subscription) to avoid
66
* state conflicts. Per-turn accumulators handle interleaved turns correctly.
@@ -26,7 +26,7 @@ import { HEADER_AMEND, HEADER_MSG_ID, HEADER_TURN_ID } from '../../constants.js'
2626
import type { Logger } from '../../logger.js';
2727
import { getHeaders } from '../../utils.js';
2828
import type { Codec, DecoderOutput, MessageAccumulator } from '../codec/types.js';
29-
import type { LoadHistoryOptions, PaginatedMessages } from './types.js';
29+
import type { HistoryPage, LoadHistoryOptions } from './types.js';
3030

3131
// ---------------------------------------------------------------------------
3232
// Shared state across pages within one history traversal
@@ -247,19 +247,16 @@ const fetchUntilLimit = async <TEvent, TMessage>(
247247
};
248248

249249
// ---------------------------------------------------------------------------
250-
// Build PaginatedMessages result from current state
250+
// Build HistoryPage result from current state
251251
// ---------------------------------------------------------------------------
252252

253253
/**
254-
* Build a PaginatedMessages page from the current decode state.
254+
* Build a HistoryPage from the current decode state.
255255
* @param state - The shared history traversal state.
256256
* @param limit - Max messages per page.
257-
* @returns A page of decoded messages with a `next()` cursor.
257+
* @returns A page of decoded history with a `next()` cursor.
258258
*/
259-
const buildResult = <TEvent, TMessage>(
260-
state: HistoryState<TEvent, TMessage>,
261-
limit: number,
262-
): PaginatedMessages<TMessage> => {
259+
const buildResult = <TEvent, TMessage>(state: HistoryState<TEvent, TMessage>, limit: number): HistoryPage<TMessage> => {
263260
// allCompleted is newest-first. Slice from returnedCount for this page,
264261
// then reverse to chronological for display.
265262
const allCompleted = decodeAll(state);
@@ -277,9 +274,7 @@ const buildResult = <TEvent, TMessage>(
277274
state.returnedRawCount = state.rawMessages.length;
278275

279276
return {
280-
items: chronSlice.map((d) => d.message),
281-
itemHeaders: chronSlice.map((d) => d.headers),
282-
itemSerials: chronSlice.map((d) => d.serial),
277+
items: chronSlice.map((d) => ({ message: d.message, headers: d.headers, serial: d.serial })),
283278
rawMessages: rawSlice,
284279
hasNext: () => moreCompleted || moreAblyPages,
285280
next: async () => {
@@ -312,15 +307,15 @@ const buildResult = <TEvent, TMessage>(
312307
* @param codec - The codec for decoding wire messages into domain messages.
313308
* @param options - Pagination options.
314309
* @param logger - Logger for diagnostic output.
315-
* @returns The first page of decoded history messages.
310+
* @returns The first page of decoded history.
316311
*/
317312
// Spec: AIT-CT11, AIT-CT11b
318313
export const decodeHistory = async <TEvent, TMessage>(
319314
channel: Ably.RealtimeChannel,
320315
codec: Codec<TEvent, TMessage>,
321316
options: LoadHistoryOptions | undefined,
322317
logger: Logger,
323-
): Promise<PaginatedMessages<TMessage>> => {
318+
): Promise<HistoryPage<TMessage>> => {
324319
const limit = options?.limit ?? 100;
325320
const state: HistoryState<TEvent, TMessage> = {
326321
codec,

src/core/transport/types.ts

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -336,20 +336,26 @@ export interface CloseOptions {
336336
// History / pagination
337337
// ---------------------------------------------------------------------------
338338

339-
/** A page of decoded messages from channel history. */
340-
export interface PaginatedMessages<TMessage> {
341-
/** Decoded messages in chronological order (oldest first). */
342-
items: TMessage[];
343-
/** Headers for each item, parallel to `items`. Used by the transport to populate the tree. */
344-
itemHeaders?: Record<string, string>[];
345-
/** Ably serial for each item, parallel to `items`. Used by the transport for tree ordering. */
346-
itemSerials?: string[];
339+
/** A single decoded history item with its transport metadata. */
340+
export interface HistoryItem<TMessage> {
341+
/** The decoded domain message. */
342+
message: TMessage;
343+
/** Transport headers for tree identity and ordering. */
344+
headers: Record<string, string>;
345+
/** Ably serial for tree ordering. */
346+
serial: string;
347+
}
348+
349+
/** A page of decoded history from the channel. Internal to View/decodeHistory. */
350+
export interface HistoryPage<TMessage> {
351+
/** Decoded items in chronological order (oldest first). */
352+
items: HistoryItem<TMessage>[];
347353
/** Raw Ably messages that produced this page, in chronological order. */
348-
rawMessages?: Ably.InboundMessage[];
354+
rawMessages: Ably.InboundMessage[];
349355
/** Whether there are older pages available. */
350356
hasNext(): boolean;
351357
/** Fetch the next (older) page. Returns undefined if no more pages. */
352-
next(): Promise<PaginatedMessages<TMessage> | undefined>;
358+
next(): Promise<HistoryPage<TMessage> | undefined>;
353359
}
354360

355361
/** Options for loading channel history. */

src/core/transport/view.ts

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ import type { TreeInternal } from './tree.js';
2626
import type {
2727
ActiveTurn,
2828
EventsNode,
29+
HistoryPage,
2930
MessageNode,
30-
PaginatedMessages,
3131
SendOptions,
3232
TurnLifecycleEvent,
3333
View,
@@ -137,7 +137,7 @@ export class DefaultView<TEvent, TMessage> implements View<TEvent, TMessage> {
137137
private _hasMoreHistory = false;
138138

139139
/** Internal state for continuing history pagination. */
140-
private _lastHistoryPage: PaginatedMessages<TMessage> | undefined;
140+
private _lastHistoryPage: HistoryPage<TMessage> | undefined;
141141

142142
/** Buffer of withheld nodes, drained newest-first by successive loadOlder() calls. */
143143
private readonly _withheldBuffer: MessageNode<TMessage>[] = [];
@@ -197,8 +197,8 @@ export class DefaultView<TEvent, TMessage> implements View<TEvent, TMessage> {
197197

198198
async loadOlder(limit = 100): Promise<void> {
199199
if (this._closed || this._loadingOlder) return;
200-
this._logger.trace('DefaultView.loadOlder();', { limit });
201200
this._loadingOlder = true;
201+
this._logger.trace('DefaultView.loadOlder();', { limit });
202202

203203
try {
204204
// Drain withheld buffer first (older messages, released newest-first)
@@ -472,6 +472,7 @@ export class DefaultView<TEvent, TMessage> implements View<TEvent, TMessage> {
472472
close(): void {
473473
this._logger.info('DefaultView.close();');
474474
this._closed = true;
475+
this._loadingOlder = false;
475476
for (const unsub of this._unsubs) unsub();
476477
this._unsubs.length = 0;
477478
this._emitter.off();
@@ -511,7 +512,7 @@ export class DefaultView<TEvent, TMessage> implements View<TEvent, TMessage> {
511512
this._releaseWithheld(released);
512513
}
513514

514-
private async _loadAndReveal(page: PaginatedMessages<TMessage>, limit: number): Promise<void> {
515+
private async _loadAndReveal(page: HistoryPage<TMessage>, limit: number): Promise<void> {
515516
// Everything currently in the tree is "already known"
516517
const alreadyKnown = new Set(this._tree.flattenNodes(this._resolveSelections()).map((n) => n.msgId));
517518

@@ -533,33 +534,28 @@ export class DefaultView<TEvent, TMessage> implements View<TEvent, TMessage> {
533534
this._releaseWithheld(batch);
534535
}
535536

536-
private _processHistoryPage(page: PaginatedMessages<TMessage>): void {
537+
private _processHistoryPage(page: HistoryPage<TMessage>): void {
537538
this._processingHistory = true;
538539
try {
539-
for (const [i, message] of page.items.entries()) {
540-
const headers = page.itemHeaders?.[i] ?? {};
541-
const serial = page.itemSerials?.[i];
542-
const msgId = headers[HEADER_MSG_ID];
540+
for (const item of page.items) {
541+
const msgId = item.headers[HEADER_MSG_ID];
543542
if (!msgId) continue;
544-
this._tree.upsert(msgId, message, headers, serial);
543+
this._tree.upsert(msgId, item.message, item.headers, item.serial);
545544
}
546545

547-
// Forward raw Ably messages through the tree
548-
if (page.rawMessages && page.rawMessages.length > 0) {
549-
for (const msg of page.rawMessages) {
550-
this._tree.emitAblyMessage(msg);
551-
}
546+
for (const msg of page.rawMessages) {
547+
this._tree.emitAblyMessage(msg);
552548
}
553549
} finally {
554550
this._processingHistory = false;
555551
}
556552
}
557553

558554
private async _loadUntilVisible(
559-
firstPage: PaginatedMessages<TMessage>,
555+
firstPage: HistoryPage<TMessage>,
560556
target: number,
561557
beforeMsgIds: Set<string>,
562-
): Promise<{ newVisible: MessageNode<TMessage>[]; lastPage: PaginatedMessages<TMessage> }> {
558+
): Promise<{ newVisible: MessageNode<TMessage>[]; lastPage: HistoryPage<TMessage> }> {
563559
this._processHistoryPage(firstPage);
564560
let page = firstPage;
565561

test/core/transport/view.test.ts

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import type { Codec } from '../../../src/core/codec/types.js';
77
import { decodeHistory } from '../../../src/core/transport/decode-history.js';
88
import type { DefaultTree } from '../../../src/core/transport/tree.js';
99
import { createTree } from '../../../src/core/transport/tree.js';
10-
import type { MessageNode, PaginatedMessages, SendOptions, TurnLifecycleEvent } from '../../../src/core/transport/types.js';
10+
import type { HistoryPage, MessageNode, SendOptions, TurnLifecycleEvent } from '../../../src/core/transport/types.js';
1111
import type { SendDelegate } from '../../../src/core/transport/view.js';
1212
import { DefaultView } from '../../../src/core/transport/view.js';
1313
import { LogLevel, makeLogger } from '../../../src/logger.js';
@@ -64,14 +64,16 @@ const makePage = (
6464
headers: Record<string, string>[],
6565
serials: string[],
6666
hasNextPage = false,
67-
nextPageFn?: () => Promise<PaginatedMessages<TestMessage> | undefined>,
68-
): PaginatedMessages<TestMessage> => ({
69-
items,
70-
itemHeaders: headers,
71-
itemSerials: serials,
67+
nextPageFn?: () => Promise<HistoryPage<TestMessage> | undefined>,
68+
): HistoryPage<TestMessage> => ({
69+
items: items.map((message, i) => ({
70+
message,
71+
headers: headers[i] ?? {},
72+
serial: serials[i] ?? '',
73+
})),
7274
rawMessages: [],
7375
hasNext: () => hasNextPage,
74-
// eslint-disable-next-line @typescript-eslint/promise-function-async, unicorn/no-useless-undefined -- mock needs explicit undefined for PaginatedMessages return type
76+
// eslint-disable-next-line @typescript-eslint/promise-function-async, unicorn/no-useless-undefined -- mock needs explicit undefined for HistoryPage return type
7577
next: nextPageFn ?? (() => Promise.resolve(undefined)),
7678
});
7779

@@ -464,6 +466,33 @@ describe('DefaultView', () => {
464466
expect(view.flattenNodes()).toHaveLength(5);
465467
});
466468

469+
it('ignores concurrent loadOlder calls', async () => {
470+
let resolveFirst: ((page: HistoryPage<TestMessage>) => void) | undefined;
471+
const firstPromise = new Promise<HistoryPage<TestMessage>>((r) => {
472+
resolveFirst = r;
473+
});
474+
vi.mocked(decodeHistory).mockReturnValue(firstPromise);
475+
476+
const page = makePage(
477+
[{ id: '1', content: 'a' }, { id: '2', content: 'b' }],
478+
[makeHeaders('h1'), makeHeaders('h2')],
479+
['serial-1', 'serial-2'],
480+
);
481+
482+
// Start two concurrent loadOlder calls
483+
const first = view.loadOlder(10);
484+
const second = view.loadOlder(10);
485+
486+
// Resolve the first — the second should have been a no-op
487+
if (resolveFirst) resolveFirst(page);
488+
await first;
489+
await second;
490+
491+
// decodeHistory should only be called once
492+
expect(vi.mocked(decodeHistory)).toHaveBeenCalledOnce();
493+
expect(view.flattenNodes()).toHaveLength(2);
494+
});
495+
467496
it('suppresses ably-message events for withheld nodes', async () => {
468497
const page = makePage(
469498
[{ id: '1', content: 'a' }, { id: '2', content: 'b' }, { id: '3', content: 'c' }],

0 commit comments

Comments
 (0)