Skip to content

Document @solana/subscribable with TypeDoc #373

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: 04-10-update_the_gettransaction_documentation_to_mostly_match_that_in_the_solana_rpc-types_documentation
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions packages/subscribable/src/async-iterable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,29 @@ import { AbortController } from '@solana/event-target-impl';
import { DataPublisher } from './data-publisher';

type Config = Readonly<{
/**
* Triggering this abort signal will cause all iterators spawned from this iterator to return
* once they have published all queued messages.
*/
abortSignal: AbortSignal;
/**
* Messages from this channel of `dataPublisher` will be the ones yielded through the iterators.
*
* Messages only begin to be queued after the first time an iterator begins to poll. Channel
* messages published before that time will be dropped.
*/
dataChannelName: string;
// FIXME: It would be nice to be able to constrain the type of `dataPublisher` to one that
// definitely supports the `dataChannelName` and `errorChannelName` channels, and
// furthermore publishes `TData` on the `dataChannelName` channel. This is more difficult
// than it should be: https://tsplay.dev/NlZelW
dataPublisher: DataPublisher;
/**
* Messages from this channel of `dataPublisher` will be the ones thrown through the iterators.
*
* Any new iterators created after the first error is encountered will reject with that error
* when polled.
*/
errorChannelName: string;
}>;

Expand Down Expand Up @@ -58,6 +74,48 @@ function createExplicitAbortToken() {

const UNINITIALIZED = Symbol();

/**
* Returns an `AsyncIterable` given a data publisher.
*
* The iterable will produce iterators that vend messages published to `dataChannelName` and will
* throw the first time a message is published to `errorChannelName`. Triggering the abort signal
* will cause all iterators spawned from this iterator to return once they have published all queued
* messages.
*
* Things to note:
*
* - If a message is published over a channel before the `AsyncIterator` attached to it has polled
* for the next result, the message will be queued in memory.
* - Messages only begin to be queued after the first time an iterator begins to poll. Channel
* messages published before that time will be dropped.
* - If there are messages in the queue and an error occurs, all queued messages will be vended to
* the iterator before the error is thrown.
* - If there are messages in the queue and the abort signal fires, all queued messages will be
* vended to the iterator after which it will return.
* - Any new iterators created after the first error is encountered will reject with that error when
* polled.
*
* @param config
*
* @example
* ```ts
* const iterable = createAsyncIterableFromDataPublisher({
* abortSignal: AbortSignal.timeout(10_000),
* dataChannelName: 'message',
* dataPublisher,
* errorChannelName: 'error',
* });
* try {
* for await (const message of iterable) {
* console.log('Got message', message);
* }
* } catch (e) {
* console.error('An error was published to the error channel', e);
* } finally {
* console.log("It's been 10 seconds; that's enough for now.");
* }
* ```
*/
export function createAsyncIterableFromDataPublisher<TData>({
abortSignal,
dataChannelName,
Expand Down
39 changes: 39 additions & 0 deletions packages/subscribable/src/data-publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,53 @@ import { TypedEventEmitter, TypedEventTarget } from './event-emitter';

type UnsubscribeFn = () => void;

/**
* Represents an object with an `on` function that you can call to subscribe to certain data over a
* named channel.
*
* @example
* ```ts
* let dataPublisher: DataPublisher<{ error: SolanaError }>;
* dataPublisher.on('data', handleData); // ERROR. `data` is not a known channel name.
* dataPublisher.on('error', e => {
* console.error(e);
* }); // OK.
* ```
*/
export interface DataPublisher<TDataByChannelName extends Record<string, unknown> = Record<string, unknown>> {
/**
* Call this to subscribe to data over a named channel.
*
* @param channelName The name of the channel on which to subscribe for messages
* @param subscriber The function to call when a message becomes available
* @param options.signal An abort signal you can fire to unsubscribe
*
* @returns A function that you can call to unsubscribe
*/
on<const TChannelName extends keyof TDataByChannelName>(
channelName: TChannelName,
subscriber: (data: TDataByChannelName[TChannelName]) => void,
options?: { signal: AbortSignal },
): UnsubscribeFn;
}

/**
* Returns an object with an `on` function that you can call to subscribe to certain data over a
* named channel.
*
* The `on` function returns an unsubscribe function.
*
* @example
* ```ts
* const socketDataPublisher = getDataPublisherFromEventEmitter(new WebSocket('wss://api.devnet.solana.com'));
* const unsubscribe = socketDataPublisher.on('message', message => {
* if (JSON.parse(message.data).id === 42) {
* console.log('Got response 42');
* unsubscribe();
* }
* });
* ```
*/
export function getDataPublisherFromEventEmitter<TEventMap extends Record<string, Event>>(
eventEmitter: TypedEventEmitter<TEventMap> | TypedEventTarget<TEventMap>,
): DataPublisher<{
Expand Down
31 changes: 31 additions & 0 deletions packages/subscribable/src/demultiplex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,37 @@ import { EventTarget } from '@solana/event-target-impl';

import { DataPublisher, getDataPublisherFromEventEmitter } from './data-publisher';

/**
* Given a channel that carries messages for multiple subscribers on a single channel name, this
* function returns a new {@link DataPublisher} that splits them into multiple channel names.
*
* @param messageTransformer A function that receives the message as the first argument, and returns
* a tuple of the derived channel name and the message.
*
* @example
* Imagine a channel that carries multiple notifications whose destination is contained within the
* message itself.
*
* ```ts
* const demuxedDataPublisher = demultiplexDataPublisher(channel, 'message', message => {
* const destinationChannelName = `notification-for:${message.subscriberId}`;
* return [destinationChannelName, message];
* });
* ```
*
* Now you can subscribe to _only_ the messages you are interested in, without having to subscribe
* to the entire `'message'` channel and filter out the messages that are not for you.
*
* ```ts
* demuxedDataPublisher.on(
* 'notification-for:123',
* message => {
* console.log('Got a message for subscriber 123', message);
* },
* { signal: AbortSignal.timeout(5_000) },
* );
* ```
*/
export function demultiplexDataPublisher<
TDataPublisher extends DataPublisher,
const TChannelName extends Parameters<TDataPublisher['on']>[0],
Expand Down
26 changes: 24 additions & 2 deletions packages/subscribable/src/event-emitter.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,19 @@
type EventMap = Record<string, Event>;
type Listener<TEvent extends Event> = ((evt: TEvent) => void) | { handleEvent(object: TEvent): void };

/**
* This type allows you to type `addEventListener` and `removeEventListener` so that the call
* signature of the listener matches the event type given.
*
* @example
* ```ts
* const emitter: TypedEventEmitter<{ message: MessageEvent }> = new WebSocket('wss://api.devnet.solana.com');
* emitter.addEventListener('data', handleData); // ERROR. `data` is not a known event type.
* emitter.addEventListener('message', message => {
* console.log(message.origin); // OK. `message` is a `MessageEvent` so it has an `origin` property.
* });
* ```
*/
export interface TypedEventEmitter<TEventMap extends EventMap> {
addEventListener<const TEventType extends keyof TEventMap>(
type: TEventType,
Expand All @@ -14,9 +27,18 @@ export interface TypedEventEmitter<TEventMap extends EventMap> {
): void;
}

// Why not just extend the interface above, rather than to copy/paste it?
// See https://github.com/microsoft/TypeScript/issues/60008
/**
* Why not just extend the interface above, rather than to copy/paste it?
* See https://github.com/microsoft/TypeScript/issues/60008
* This type is a superset of `TypedEventEmitter` that allows you to constrain calls to
* `dispatchEvent`.
*
* @example
* ```ts
* const target: TypedEventTarget<{ candyVended: CustomEvent<{ flavour: string }> }> = new EventTarget();
* target.dispatchEvent(new CustomEvent('candyVended', { detail: { flavour: 'raspberry' } })); // OK.
* target.dispatchEvent(new CustomEvent('candyVended', { detail: { flavor: 'raspberry' } })); // ERROR. Misspelling in detail.
* ```
*/
export interface TypedEventTarget<TEventMap extends EventMap> {
addEventListener<const TEventType extends keyof TEventMap>(
Expand Down
8 changes: 8 additions & 0 deletions packages/subscribable/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
/**
* This package contains utilities for creating subscription-based event targets. These differ from
* the `EventTarget` interface in that the method you use to add a listener returns an unsubscribe
* function. It is primarily intended for internal use -- particularly for those building
* {@link RpcSubscriptionChannel | RpcSubscriptionChannels} and associated infrastructure.
*
* @packageDocumentation
*/
export * from './async-iterable';
export * from './data-publisher';
export * from './demultiplex';
Expand Down