Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 110 additions & 0 deletions apps/api/src/ws/connection-registry.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import type { RealtimeEvent } from "@cossistant/types/realtime-events";
import type { ServerWebSocket } from "bun";
import type { DispatchOptions } from "./router";

export type RawSocket = ServerWebSocket & { connectionId?: string };

export type LocalConnectionRecord = {
socket: RawSocket;
websiteId?: string;
organizationId?: string;
userId?: string;
visitorId?: string;
};

export const localConnections = new Map<string, LocalConnectionRecord>();

function createExcludePredicate(
options?: DispatchOptions
): ((connectionId: string) => boolean) | undefined {
if (!options?.exclude) {
return;
}

const excludeIds = Array.isArray(options.exclude)
? new Set(options.exclude)
: new Set<string>([options.exclude]);

return (connectionId: string) => excludeIds.has(connectionId);
}

function sendEventToSocket(
record: LocalConnectionRecord,
serializedEvent: string
): void {
try {
record.socket.send(serializedEvent);
} catch (error) {
console.error("[WebSocket] Failed to send event:", error);
}
}

export function dispatchEventToLocalConnection(
connectionId: string,
event: RealtimeEvent
): void {
const connection = localConnections.get(connectionId);
if (!connection) {
return;
}

const serializedEvent = JSON.stringify(event);
sendEventToSocket(connection, serializedEvent);
}

export function dispatchEventToLocalVisitor(
visitorId: string,
event: RealtimeEvent,
options?: DispatchOptions
): void {
const shouldExclude = createExcludePredicate(options);
const serializedEvent = JSON.stringify(event);

for (const [connectionId, connection] of localConnections) {
if (connection.visitorId !== visitorId) {
continue;
}

if (shouldExclude?.(connectionId)) {
continue;
}

console.log("[WebSocket] Dispatching visitor event", {
visitorId,
connectionId,
eventType: event.type,
});
sendEventToSocket(connection, serializedEvent);
}
}

export function dispatchEventToLocalWebsite(
websiteId: string,
event: RealtimeEvent,
options?: DispatchOptions
): void {
const shouldExclude = createExcludePredicate(options);
const serializedEvent = JSON.stringify(event);

for (const [connectionId, connection] of localConnections) {
if (connection.websiteId !== websiteId) {
continue;
}

// Only dashboard/user connections should receive website events
if (!connection.userId) {
continue;
}

if (shouldExclude?.(connectionId)) {
continue;
}

console.log("[WebSocket] Dispatching website event", {
websiteId,
connectionId,
eventType: event.type,
});
sendEventToSocket(connection, serializedEvent);
}
}
280 changes: 280 additions & 0 deletions apps/api/src/ws/realtime-pubsub.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
import { env } from "@api/env";
import {
isValidEventType,
type RealtimeEvent,
validateRealtimeEvent,
} from "@cossistant/types/realtime-events";
import { Redis } from "@upstash/redis";
import type { DispatchOptions } from "./router";

const REALTIME_CHANNEL = "realtime:dispatch";
const MAX_PUBLISH_RETRIES = 3;
const BASE_RETRY_DELAY_MS = 100;

const redisConfig = {
url: env.UPSTASH_REDIS_REST_URL,
token: env.UPSTASH_REDIS_REST_TOKEN,
} as const;
const publisher = new Redis(redisConfig);
const subscriberClient = new Redis(redisConfig);

const instanceId = `api-${process.pid ?? "pid"}-${Math.random()
.toString(36)
.slice(2, 10)}`;

type SubscriberInstance = ReturnType<typeof subscriberClient.subscribe>;

Comment on lines +25 to +26

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Type bug: SubscriberInstance should await the subscribe() return type.

subscribe() in @upstash/redis returns a Subscriber (often via Promise). Using ReturnType without Awaited makes teardownSubscriber’s unsubscribe() unsafe at compile/runtime.

Apply:

-type SubscriberInstance = ReturnType<typeof subscriberClient.subscribe>;
+type SubscriberInstance = Awaited<ReturnType<typeof subscriberClient.subscribe>>;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
type SubscriberInstance = ReturnType<typeof subscriberClient.subscribe>;
type SubscriberInstance = Awaited<ReturnType<typeof subscriberClient.subscribe>>;
🤖 Prompt for AI Agents
In apps/api/src/ws/realtime-pubsub.ts around lines 25 to 26, the type alias
SubscriberInstance uses ReturnType<typeof subscriberClient.subscribe> which
doesn’t account for subscribe() returning a Promise; change the alias to await
the promise (e.g., use Awaited<ReturnType<typeof subscriberClient.subscribe>> or
wrap with PromiseResolve) so teardownSubscriber can safely call unsubscribe() at
compile and runtime, and update any related variable declarations to match the
awaited Subscriber type.

type DispatchTarget =
| {
type: "connection";
id: string;
}
| {
type: "visitor" | "website";
id: string;
exclude?: string[];
};

type DispatchEnvelope = {
sourceId: string;
target: DispatchTarget;
event: RealtimeEvent;
};

type LocalDispatchers = {
connection: (connectionId: string, event: RealtimeEvent) => void;
visitor: (
visitorId: string,
event: RealtimeEvent,
options?: DispatchOptions
) => void;
website: (
websiteId: string,
event: RealtimeEvent,
options?: DispatchOptions
) => void;
};

let subscriber: SubscriberInstance | null = null;
let dispatchersRef: LocalDispatchers | null = null;
let reconnectTimer: ReturnType<typeof setTimeout> | null = null;

function normalizeExclude(options?: DispatchOptions): string[] | undefined {
if (!options?.exclude) {
return;
}

return Array.isArray(options.exclude) ? options.exclude : [options.exclude];
}

function scheduleReconnect(): void {
if (reconnectTimer) {
return;
}

reconnectTimer = setTimeout(() => {
reconnectTimer = null;
startSubscription();
}, 1000);
}

async function teardownSubscriber(): Promise<void> {
const activeSubscriber = subscriber;
subscriber = null;
if (!activeSubscriber) {
return;
}

try {
await activeSubscriber.unsubscribe();
} catch (error) {
console.error("[RealtimePubSub] Failed to unsubscribe from Redis:", error);
}
}

function handleEnvelope(envelope: DispatchEnvelope | undefined): void {
if (!envelope) {
return;
}

const dispatchers = dispatchersRef;
if (!dispatchers) {
return;
}

const { event, target } = envelope;

if (!isValidEventType(event.type)) {
console.error("[RealtimePubSub] Ignoring invalid event type", event.type);
return;
}

try {
validateRealtimeEvent(event.type, event.data);
} catch (error) {
console.error(
"[RealtimePubSub] Ignoring event with invalid payload",
error
);
return;
}

const exclude =
target.type === "connection"
? undefined
: target.exclude?.filter(
(value): value is string => typeof value === "string"
);
const options = exclude?.length
? ({ exclude } satisfies DispatchOptions)
: undefined;

try {
switch (target.type) {
case "connection": {
dispatchers.connection(target.id, event);
break;
}
case "visitor": {
dispatchers.visitor(target.id, event, options);
break;
}
case "website": {
dispatchers.website(target.id, event, options);
break;
}
default: {
const exhaustiveCheck: never = target;
console.error(
"[RealtimePubSub] Unsupported dispatch target",
exhaustiveCheck
);
}
}
} catch (error) {
console.error("[RealtimePubSub] Failed to dispatch realtime event", error);
}
}

function startSubscription(): void {
if (!dispatchersRef) {
return;
}

if (subscriber) {
return;
}

try {
const nextSubscriber =
subscriberClient.subscribe<DispatchEnvelope>(REALTIME_CHANNEL);
nextSubscriber.on("message", ({ message }) => {
handleEnvelope(message);
});
nextSubscriber.on("error", (error) => {
console.error("[RealtimePubSub] Subscription error", error);
teardownSubscriber()
.catch((teardownError) => {
console.error(
"[RealtimePubSub] Failed to teardown subscriber after error",
teardownError
);
})
.finally(() => {
scheduleReconnect();
});
});

subscriber = nextSubscriber;
} catch (error) {
console.error(
"[RealtimePubSub] Failed to subscribe to realtime channel",
error
);
scheduleReconnect();
}
}
Comment on lines +168 to +196

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

🧩 Analysis chain

Likely incorrect subscribe API usage (shape and sync/async assumptions)

Code assumes an EventEmitter-style .on("message"|"error") and a sync subscribe(); Upstash’s docs emphasize REST/SSE for SUBSCRIBE, and examples often use ioredis for subscriptions. Verify the actual TS SDK API; you may need await redis.subscribe({ channel, onMessage }), and to JSON.parse payloads.

-  try {
-    const nextSubscriber =
-      subscriberClient.subscribe<DispatchEnvelope>(REALTIME_CHANNEL);
-    nextSubscriber.on("message", ({ message }) => {
-      handleEnvelope(message);
-    });
-    nextSubscriber.on("error", (error) => {
-      console.error("[RealtimePubSub] Subscription error", error);
-      teardownSubscriber()
-        .catch((teardownError) => {
-          console.error(
-            "[RealtimePubSub] Failed to teardown subscriber after error",
-            teardownError
-          );
-        })
-        .finally(() => {
-          scheduleReconnect();
-        });
-    });
-    subscriber = nextSubscriber;
-  } catch (error) {
+  try {
+    // Adjust to the actual SDK signature; many installs require `await`.
+    const sub = await (subscriberClient as any).subscribe(REALTIME_CHANNEL, (raw: unknown) => {
+      const msg = typeof raw === "string" ? safeParse<DispatchEnvelope>(raw) : (raw as DispatchEnvelope | undefined);
+      handleEnvelope(msg);
+    });
+    // If the SDK provides an error callback, wire it here:
+    // sub.onError?.((error: unknown) => { ... });
+    subscriber = sub;
+  } catch (error) {
     console.error(
       "[RealtimePubSub] Failed to subscribe to realtime channel",
       error
     );
     scheduleReconnect();
   }

Add a small helper near the top:

function safeParse<T>(s: string): T | undefined {
  try { return JSON.parse(s) as T; } catch { return undefined; }
}

Citations for context on publish/subscribe and SSE: Upstash TS publish docs and REST SUBSCRIBE docs. (upstash.com)


🌐 Web query:

What is the correct TypeScript subscribe API for @upstash/redis Pub/Sub (callback signature, return type, and error handling) as of September 2025?

💡 Result:

Short answer: there is no long‑running, callback‑style TypeScript subscribe() in @upstash/redis — the HTTP/REST SDK exposes Redis pub/sub commands (PUBLISH and the PUBSUB info commands) but not a persistent server‑push SUBSCRIBE websocket/tcp stream with a callback signature. Use PUBLISH/PUBSUB via the SDK or use a standard TCP Redis client (ioredis/node‑redis) for real time SUBSCRIBE/PSUBSCRIBE callbacks.

Key citations:

  • Upstash REST API feature table: Pub/Sub — only PUBLISH and PUBSUB are supported via REST. [1]
  • SDK docs / changelog: the @upstash/redis SDK is an HTTP/REST client; errors are thrown by commands (not returned as {data,error}) — i.e., follow normal async/await try/catch error handling. [2][3]

Implication / recommended usage (concise):

  • If you need a callback signature like (channel: string, message: string) => void and a return/unsubscribe function, use a TCP Redis client (ioredis or node‑redis) and its subscribe API (it provides subscribe(channel, cb) or emitter events and returns an unsubscribe). Upstash’s HTTP SDK only supports publishing and PUBSUB inspection; handle errors with try/catch around async calls. [1][2]

References:

  1. Upstash REST API / Features (Pub/Sub support notes). [1]
  2. Upstash TypeScript SDK docs (overview / get started / advanced — SDK is HTTP based; errors thrown). [2][3]

If you want, I can show (a) example code using ioredis subscribe(callback) or (b) how to publish / inspect pubsub with @upstash/redis and handle errors via try/catch.


Fix pub/sub: @upstash/redis has no long‑running subscribe — change implementation

apps/api/src/ws/realtime-pubsub.ts (lines 170–198): the code assumes an EventEmitter-style, long‑running subscribe(...) with .on("message"|"error"). @upstash/redis is an HTTP/REST client (no persistent SUBSCRIBE callback) — this will fail if subscriberClient is an Upstash client.

  • If you need real‑time SUBSCRIBE, replace subscriberClient with a TCP Redis client (ioredis or node‑redis) and use its subscribe/client.on('message', ...) API; wire unsubscribe/close and error handlers accordingly.
  • If you must keep @upstash/redis, stop relying on .on('message') and instead use PUBLISH + PUBSUB inspection (or add a separate SSE/WebSocket layer or polling) for real‑time delivery.
  • Parse payloads safely (JSON.parse with try/catch or a safeParse helper) before calling handleEnvelope and validate the parsed object.
  • Ensure teardown/reconnect logic is triggered from the chosen client's real error/unsubscribe callbacks and that async subscribe calls are awaited and wrapped in try/catch.
🤖 Prompt for AI Agents
In apps/api/src/ws/realtime-pubsub.ts around lines 170–198 the code incorrectly
treats subscriberClient as an EventEmitter long‑running subscriber (using
.on("message"| "error")) which does not work with @upstash/redis HTTP client;
replace or adapt the implementation: either swap subscriberClient for a TCP
Redis client (ioredis or node‑redis) and use its subscribe/await subscribe call
plus client.on('message') and client.on('error') handlers (ensure
unsubscribe/quit teardown and reconnection logic are wired to those events), or
if you must keep @upstash/redis, remove the .on handlers and implement an
alternative (polling, SSE/websocket, or inspect PUBSUB via REST) for real‑time
delivery; additionally, always parse incoming payloads with a safe JSON.parse
try/catch (or safeParse helper) and validate the parsed DispatchEnvelope before
calling handleEnvelope, and ensure all async subscribe/teardown calls are
awaited and wrapped in try/catch so teardown/reconnect is triggered from real
error/unsubscribe outcomes.

Comment on lines +159 to +196

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Broken subscription wiring: wrong API shape and missing await/ready.

This treats the Upstash subscriber like an EventEmitter with .on("message"/"error") and calls subscribe() synchronously. The SDK exposes a Subscriber for REST/SSE SUBSCRIBE; you should await the subscription and (newer versions) await sub.ready before publishing/consuming. Also parse JSON payloads before dispatch.

Suggested minimal fix (pattern; adjust to your installed SDK version):

-function startSubscription(): void {
+async function startSubscription(): Promise<void> {
   if (!dispatchersRef) {
     return;
   }
   if (subscriber) {
     return;
   }
-
-  try {
-    const nextSubscriber =
-      subscriberClient.subscribe<DispatchEnvelope>(REALTIME_CHANNEL);
-    nextSubscriber.on("message", ({ message }) => {
-      handleEnvelope(message);
-    });
-    nextSubscriber.on("error", (error) => {
-      console.error("[RealtimePubSub] Subscription error", error);
-      teardownSubscriber()
-        .catch((teardownError) => {
-          console.error(
-            "[RealtimePubSub] Failed to teardown subscriber after error",
-            teardownError
-          );
-        })
-        .finally(() => {
-          scheduleReconnect();
-        });
-    });
-    subscriber = nextSubscriber;
-  } catch (error) {
+  try {
+    // Upstash SUBSCRIBE runs over SSE; await the subscriber and its readiness signal when available.
+    const sub = await (subscriberClient as any).subscribe(REALTIME_CHANNEL, (raw: unknown) => {
+      const msg = typeof raw === "string" ? safeParse<DispatchEnvelope>(raw) : (raw as DispatchEnvelope | undefined);
+      handleEnvelope(msg);
+    });
+    await (sub?.ready ?? Promise.resolve());
+    // Prefer real client hooks if provided (e.g. sub.onError):
+    sub?.onError?.((error: unknown) => {
+      logError("[RealtimePubSub] Subscription error", error);
+      teardownSubscriber().finally(scheduleReconnect);
+    });
+    subscriber = sub;
+  } catch (error) {
-    console.error(
+    logError(
       "[RealtimePubSub] Failed to subscribe to realtime channel",
       error
     );
     scheduleReconnect();
   }
}

Add near the top of this file:

function safeParse<T>(s: string): T | undefined {
  try { return JSON.parse(s) as T; } catch { return undefined; }
}

The SDK gained SUBSCRIBE via SSE (July 2024) and a Subscriber.ready convenience (PR Apr 15, 2025); aligning to that surface avoids race conditions and handler miswiring. (upstash.com)


async function publishEnvelope(
envelope: DispatchEnvelope,
attempt = 0
): Promise<void> {
try {
await publisher.publish(REALTIME_CHANNEL, JSON.stringify(envelope));
} catch (error) {
if (attempt >= MAX_PUBLISH_RETRIES) {
console.error(
"[RealtimePubSub] Failed to publish realtime event after retries",
error
);
return;
}

const retryDelay = BASE_RETRY_DELAY_MS * 2 ** attempt;
setTimeout(() => {
publishEnvelope(envelope, attempt + 1).catch((retryError) => {
console.error(
"[RealtimePubSub] Failed to publish realtime event",
retryError
);
});
}, retryDelay);
}
}

export function initializeRealtimePubSub(dispatchers: LocalDispatchers): void {
dispatchersRef = dispatchers;

startSubscription();
}

export function publishToConnection(
connectionId: string,
event: RealtimeEvent
): Promise<void> {
const envelope: DispatchEnvelope = {
sourceId: instanceId,
target: { type: "connection", id: connectionId },
event,
};

return publishEnvelope(envelope);
}

export function publishToVisitor(
visitorId: string,
event: RealtimeEvent,
options?: DispatchOptions
): Promise<void> {
const exclude = normalizeExclude(options);
const envelope: DispatchEnvelope = {
sourceId: instanceId,
target: {
type: "visitor",
id: visitorId,
exclude,
},
event,
};

return publishEnvelope(envelope);
}

export function publishToWebsite(
websiteId: string,
event: RealtimeEvent,
options?: DispatchOptions
): Promise<void> {
const exclude = normalizeExclude(options);
const envelope: DispatchEnvelope = {
sourceId: instanceId,
target: {
type: "website",
id: websiteId,
exclude,
},
event,
};

return publishEnvelope(envelope);
}
Loading
Loading