diff --git a/packages/subscribable/src/async-iterable.ts b/packages/subscribable/src/async-iterable.ts index 92d3a9512..7d6e6c212 100644 --- a/packages/subscribable/src/async-iterable.ts +++ b/packages/subscribable/src/async-iterable.ts @@ -8,13 +8,29 @@ import { AbortController } from '@solana/event-target-impl'; import { DataPublisher } from './data-publisher'; type Config = Readonly<{ + /** + * Triggering this abort signal will cause all iterators spawned from this iterator to return + * once they have published all queued messages. + */ abortSignal: AbortSignal; + /** + * Messages from this channel of `dataPublisher` will be the ones yielded through the iterators. + * + * Messages only begin to be queued after the first time an iterator begins to poll. Channel + * messages published before that time will be dropped. + */ dataChannelName: string; // FIXME: It would be nice to be able to constrain the type of `dataPublisher` to one that // definitely supports the `dataChannelName` and `errorChannelName` channels, and // furthermore publishes `TData` on the `dataChannelName` channel. This is more difficult // than it should be: https://tsplay.dev/NlZelW dataPublisher: DataPublisher; + /** + * Messages from this channel of `dataPublisher` will be the ones thrown through the iterators. + * + * Any new iterators created after the first error is encountered will reject with that error + * when polled. + */ errorChannelName: string; }>; @@ -58,6 +74,48 @@ function createExplicitAbortToken() { const UNINITIALIZED = Symbol(); +/** + * Returns an `AsyncIterable` given a data publisher. + * + * The iterable will produce iterators that vend messages published to `dataChannelName` and will + * throw the first time a message is published to `errorChannelName`. Triggering the abort signal + * will cause all iterators spawned from this iterator to return once they have published all queued + * messages. + * + * Things to note: + * + * - If a message is published over a channel before the `AsyncIterator` attached to it has polled + * for the next result, the message will be queued in memory. + * - Messages only begin to be queued after the first time an iterator begins to poll. Channel + * messages published before that time will be dropped. + * - If there are messages in the queue and an error occurs, all queued messages will be vended to + * the iterator before the error is thrown. + * - If there are messages in the queue and the abort signal fires, all queued messages will be + * vended to the iterator after which it will return. + * - Any new iterators created after the first error is encountered will reject with that error when + * polled. + * + * @param config + * + * @example + * ```ts + * const iterable = createAsyncIterableFromDataPublisher({ + * abortSignal: AbortSignal.timeout(10_000), + * dataChannelName: 'message', + * dataPublisher, + * errorChannelName: 'error', + * }); + * try { + * for await (const message of iterable) { + * console.log('Got message', message); + * } + * } catch (e) { + * console.error('An error was published to the error channel', e); + * } finally { + * console.log("It's been 10 seconds; that's enough for now."); + * } + * ``` + */ export function createAsyncIterableFromDataPublisher({ abortSignal, dataChannelName, diff --git a/packages/subscribable/src/data-publisher.ts b/packages/subscribable/src/data-publisher.ts index 59367bf2f..9fd804658 100644 --- a/packages/subscribable/src/data-publisher.ts +++ b/packages/subscribable/src/data-publisher.ts @@ -2,7 +2,29 @@ import { TypedEventEmitter, TypedEventTarget } from './event-emitter'; type UnsubscribeFn = () => void; +/** + * Represents an object with an `on` function that you can call to subscribe to certain data over a + * named channel. + * + * @example + * ```ts + * let dataPublisher: DataPublisher<{ error: SolanaError }>; + * dataPublisher.on('data', handleData); // ERROR. `data` is not a known channel name. + * dataPublisher.on('error', e => { + * console.error(e); + * }); // OK. + * ``` + */ export interface DataPublisher = Record> { + /** + * Call this to subscribe to data over a named channel. + * + * @param channelName The name of the channel on which to subscribe for messages + * @param subscriber The function to call when a message becomes available + * @param options.signal An abort signal you can fire to unsubscribe + * + * @returns A function that you can call to unsubscribe + */ on( channelName: TChannelName, subscriber: (data: TDataByChannelName[TChannelName]) => void, @@ -10,6 +32,23 @@ export interface DataPublisher { + * if (JSON.parse(message.data).id === 42) { + * console.log('Got response 42'); + * unsubscribe(); + * } + * }); + * ``` + */ export function getDataPublisherFromEventEmitter>( eventEmitter: TypedEventEmitter | TypedEventTarget, ): DataPublisher<{ diff --git a/packages/subscribable/src/demultiplex.ts b/packages/subscribable/src/demultiplex.ts index 0b7e21497..8a0cfb7ae 100644 --- a/packages/subscribable/src/demultiplex.ts +++ b/packages/subscribable/src/demultiplex.ts @@ -2,6 +2,37 @@ import { EventTarget } from '@solana/event-target-impl'; import { DataPublisher, getDataPublisherFromEventEmitter } from './data-publisher'; +/** + * Given a channel that carries messages for multiple subscribers on a single channel name, this + * function returns a new {@link DataPublisher} that splits them into multiple channel names. + * + * @param messageTransformer A function that receives the message as the first argument, and returns + * a tuple of the derived channel name and the message. + * + * @example + * Imagine a channel that carries multiple notifications whose destination is contained within the + * message itself. + * + * ```ts + * const demuxedDataPublisher = demultiplexDataPublisher(channel, 'message', message => { + * const destinationChannelName = `notification-for:${message.subscriberId}`; + * return [destinationChannelName, message]; + * }); + * ``` + * + * Now you can subscribe to _only_ the messages you are interested in, without having to subscribe + * to the entire `'message'` channel and filter out the messages that are not for you. + * + * ```ts + * demuxedDataPublisher.on( + * 'notification-for:123', + * message => { + * console.log('Got a message for subscriber 123', message); + * }, + * { signal: AbortSignal.timeout(5_000) }, + * ); + * ``` + */ export function demultiplexDataPublisher< TDataPublisher extends DataPublisher, const TChannelName extends Parameters[0], diff --git a/packages/subscribable/src/event-emitter.ts b/packages/subscribable/src/event-emitter.ts index 77839a5ea..fdc94b37c 100644 --- a/packages/subscribable/src/event-emitter.ts +++ b/packages/subscribable/src/event-emitter.ts @@ -1,6 +1,19 @@ type EventMap = Record; type Listener = ((evt: TEvent) => void) | { handleEvent(object: TEvent): void }; +/** + * This type allows you to type `addEventListener` and `removeEventListener` so that the call + * signature of the listener matches the event type given. + * + * @example + * ```ts + * const emitter: TypedEventEmitter<{ message: MessageEvent }> = new WebSocket('wss://api.devnet.solana.com'); + * emitter.addEventListener('data', handleData); // ERROR. `data` is not a known event type. + * emitter.addEventListener('message', message => { + * console.log(message.origin); // OK. `message` is a `MessageEvent` so it has an `origin` property. + * }); + * ``` + */ export interface TypedEventEmitter { addEventListener( type: TEventType, @@ -14,9 +27,18 @@ export interface TypedEventEmitter { ): void; } +// Why not just extend the interface above, rather than to copy/paste it? +// See https://github.com/microsoft/TypeScript/issues/60008 /** - * Why not just extend the interface above, rather than to copy/paste it? - * See https://github.com/microsoft/TypeScript/issues/60008 + * This type is a superset of `TypedEventEmitter` that allows you to constrain calls to + * `dispatchEvent`. + * + * @example + * ```ts + * const target: TypedEventTarget<{ candyVended: CustomEvent<{ flavour: string }> }> = new EventTarget(); + * target.dispatchEvent(new CustomEvent('candyVended', { detail: { flavour: 'raspberry' } })); // OK. + * target.dispatchEvent(new CustomEvent('candyVended', { detail: { flavor: 'raspberry' } })); // ERROR. Misspelling in detail. + * ``` */ export interface TypedEventTarget { addEventListener( diff --git a/packages/subscribable/src/index.ts b/packages/subscribable/src/index.ts index 43b987822..7d8f0ea7f 100644 --- a/packages/subscribable/src/index.ts +++ b/packages/subscribable/src/index.ts @@ -1,3 +1,11 @@ +/** + * This package contains utilities for creating subscription-based event targets. These differ from + * the `EventTarget` interface in that the method you use to add a listener returns an unsubscribe + * function. It is primarily intended for internal use -- particularly for those building + * {@link RpcSubscriptionChannel | RpcSubscriptionChannels} and associated infrastructure. + * + * @packageDocumentation + */ export * from './async-iterable'; export * from './data-publisher'; export * from './demultiplex';