Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 21 additions & 12 deletions src/core/transport/client-transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,15 @@ import { createView, type DefaultView } from './view.js';
// eslint-disable-next-line @typescript-eslint/no-empty-function -- intentional no-op
const noopUnsubscribe = (): void => {};

// ---------------------------------------------------------------------------
// Internal state machine
// ---------------------------------------------------------------------------

enum ClientTransportState {
READY = 'ready',
CLOSED = 'closed',
}

// ---------------------------------------------------------------------------
// Event map for the transport's typed EventEmitter
// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -128,7 +137,7 @@ class DefaultClientTransport<TEvent, TMessage> implements ClientTransport<TEvent
private readonly _attachPromise: Promise<unknown>;
private readonly _onMessage: (msg: Ably.InboundMessage) => void;

private _closed = false;
private _state = ClientTransportState.READY;

constructor(options: ClientTransportOptions<TEvent, TMessage>) {
this._channel = options.channel;
Expand Down Expand Up @@ -201,7 +210,7 @@ class DefaultClientTransport<TEvent, TMessage> implements ClientTransport<TEvent
// ---------------------------------------------------------------------------

private _handleMessage(ablyMessage: Ably.InboundMessage): void {
if (this._closed) return;
if (this._state === ClientTransportState.CLOSED) return;

try {
// Spec: AIT-CT16a
Expand Down Expand Up @@ -555,7 +564,7 @@ class DefaultClientTransport<TEvent, TMessage> implements ClientTransport<TEvent

// Spec: AIT-CT10b
createView(): View<TEvent, TMessage> {
if (this._closed) {
if (this._state === ClientTransportState.CLOSED) {
throw new Ably.ErrorInfo('unable to create view; transport is closed', ErrorCode.TransportClosed, 400);
}
this._logger.trace('DefaultClientTransport.createView();');
Expand All @@ -578,14 +587,14 @@ class DefaultClientTransport<TEvent, TMessage> implements ClientTransport<TEvent
history: MessageNode<TMessage>[],
eventNodes?: EventsNode<TEvent>[],
): Promise<ActiveTurn<TEvent>> {
if (this._closed) {
if (this._state === ClientTransportState.CLOSED) {
throw new Ably.ErrorInfo('unable to send; transport is closed', ErrorCode.TransportClosed, 400);
}
await this._attachPromise;
// CAST: re-check after await — close() may have been called while waiting for attach.
// TypeScript's control flow narrows _closed to false after the first check, but the
// await yields and close() can mutate _closed concurrently.
if (this._closed as boolean) {
// TypeScript's control flow narrows _state after the first check, but the
// await yields and close() can mutate _state concurrently.
if ((this._state as ClientTransportState) === ClientTransportState.CLOSED) {
throw new Ably.ErrorInfo('unable to send; transport is closed', ErrorCode.TransportClosed, 400);
}

Expand Down Expand Up @@ -751,7 +760,7 @@ class DefaultClientTransport<TEvent, TMessage> implements ClientTransport<TEvent

// Spec: AIT-CT7, AIT-CT7a
async cancel(filter?: CancelFilter): Promise<void> {
if (this._closed) return;
if (this._state === ClientTransportState.CLOSED) return;
const resolved = filter ?? { own: true };
this._logger.debug('ClientTransport.cancel();', { filter: resolved });
await this._publishCancel(resolved);
Expand All @@ -760,7 +769,7 @@ class DefaultClientTransport<TEvent, TMessage> implements ClientTransport<TEvent

// Spec: AIT-CT18
async waitForTurn(filter?: CancelFilter): Promise<void> {
if (this._closed) return;
if (this._state === ClientTransportState.CLOSED) return;
const resolved = filter ?? { own: true };
const remaining = this._getMatchingTurnIds(resolved);
if (remaining.size === 0) return;
Expand Down Expand Up @@ -791,7 +800,7 @@ class DefaultClientTransport<TEvent, TMessage> implements ClientTransport<TEvent

// Spec: AIT-CT8, AIT-CT8c, AIT-CT8d
on(event: 'error', handler: (error: Ably.ErrorInfo) => void): () => void {
if (this._closed) return noopUnsubscribe;
if (this._state === ClientTransportState.CLOSED) return noopUnsubscribe;
// CAST: the overload signature enforces the correct handler type.
const cb = handler as (arg: ClientTransportEventsMap[keyof ClientTransportEventsMap]) => void;
this._emitter.on(event, cb);
Expand All @@ -802,8 +811,8 @@ class DefaultClientTransport<TEvent, TMessage> implements ClientTransport<TEvent

// Spec: AIT-CT12, AIT-CT12a, AIT-CT12b, AIT-CT10c
async close(options?: CloseOptions): Promise<void> {
if (this._closed) return;
this._closed = true;
if (this._state === ClientTransportState.CLOSED) return;
this._state = ClientTransportState.CLOSED;
this._logger.info('ClientTransport.close();');

// Best-effort cancel publish before tearing down local state
Expand Down
29 changes: 19 additions & 10 deletions src/core/transport/server-transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,16 @@ interface RegisteredTurn {
onError?: (error: Ably.ErrorInfo) => void;
}

// ---------------------------------------------------------------------------
// Internal state machine
// ---------------------------------------------------------------------------

enum TurnState {
INITIALIZED = 'initialized',
STARTED = 'started',
ENDED = 'ended',
}

// ---------------------------------------------------------------------------
// Implementation
// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -252,8 +262,7 @@ class DefaultServerTransport<TEvent, TMessage> implements ServerTransport<TEvent
} = turnOpts;

const controller = new AbortController();
let started = false;
let ended = false;
let state = TurnState.INITIALIZED;

// Spec: AIT-ST3a — register immediately so early cancels can fire the abort signal.
const registration: RegisteredTurn = {
Expand Down Expand Up @@ -294,8 +303,8 @@ class DefaultServerTransport<TEvent, TMessage> implements ServerTransport<TEvent
400,
);
}
if (started) return;
started = true;
if (state !== TurnState.INITIALIZED) return;
state = TurnState.STARTED;

try {
await turnManager.startTurn(turnId, turnClientId, controller, {
Expand All @@ -321,7 +330,7 @@ class DefaultServerTransport<TEvent, TMessage> implements ServerTransport<TEvent
addMessages: async (nodes: MessageNode<TMessage>[], opts?: AddMessageOptions): Promise<AddMessagesResult> => {
logger?.trace('Turn.addMessages();', { turnId, count: nodes.length });

if (!started) {
if (state === TurnState.INITIALIZED) {
throw new Ably.ErrorInfo(
`unable to add messages; start() must be called before addMessages() (turn ${turnId})`,
ErrorCode.InvalidArgument,
Expand Down Expand Up @@ -364,7 +373,7 @@ class DefaultServerTransport<TEvent, TMessage> implements ServerTransport<TEvent
addEvents: async (nodes: EventsNode<TEvent>[]): Promise<void> => {
logger?.trace('Turn.addEvents();', { turnId, count: nodes.length });

if (!started) {
if (state === TurnState.INITIALIZED) {
throw new Ably.ErrorInfo(
`unable to add events; start() must be called before addEvents() (turn ${turnId})`,
ErrorCode.InvalidArgument,
Expand Down Expand Up @@ -406,7 +415,7 @@ class DefaultServerTransport<TEvent, TMessage> implements ServerTransport<TEvent
): Promise<StreamResult> => {
logger?.trace('Turn.streamResponse();', { turnId });

if (!started) {
if (state === TurnState.INITIALIZED) {
throw new Ably.ErrorInfo(
`unable to stream response; start() must be called before streamResponse() (turn ${turnId})`,
ErrorCode.InvalidArgument,
Expand Down Expand Up @@ -444,15 +453,15 @@ class DefaultServerTransport<TEvent, TMessage> implements ServerTransport<TEvent
end: async (reason: TurnEndReason): Promise<void> => {
logger?.trace('Turn.end();', { turnId, reason });

if (!started) {
if (state === TurnState.INITIALIZED) {
throw new Ably.ErrorInfo(
`unable to end turn; start() must be called before end() (turn ${turnId})`,
ErrorCode.InvalidArgument,
400,
);
}
if (ended) return;
ended = true;
if (state === TurnState.ENDED) return;
state = TurnState.ENDED;

try {
await turnManager.endTurn(turnId, reason);
Expand Down
4 changes: 2 additions & 2 deletions src/core/transport/turn-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ export interface TurnManager {
// Internal state
// ---------------------------------------------------------------------------

interface TurnState {
interface ActiveTurnsEntry {
controller: AbortController;
clientId: string;
}
Expand All @@ -63,7 +63,7 @@ interface TurnState {
class DefaultTurnManager implements TurnManager {
private readonly _channel: Ably.RealtimeChannel;
private readonly _logger: Logger | undefined;
private readonly _activeTurns = new Map<string, TurnState>();
private readonly _activeTurns = new Map<string, ActiveTurnsEntry>();

constructor(channel: Ably.RealtimeChannel, logger?: Logger) {
this._channel = channel;
Expand Down
Loading