feat(api): add redis-based websocket pubsub#12
Conversation
|
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. WalkthroughAdds a local WebSocket connection registry and a Redis-backed realtime Pub/Sub dispatcher, refactors socket handling to publish/subscribe dispatchers, and updates tests/types to mock and accommodate the new pub/sub layer. Changes
Sequence Diagram(s)sequenceDiagram
participant Client as WebSocket Client
participant Socket as Socket Handler
participant Registry as Connection Registry
participant PubSub as Realtime PubSub
participant Redis as Upstash Redis
Client->>Socket: Upgrade request / open
Socket->>Socket: authenticateWebSocketConnection()
alt auth success
Socket->>Registry: register connection (localConnections)
Socket->>PubSub: publish USER_CONNECTED envelope
PubSub->>Redis: publish envelope (realtime:dispatch)
Redis->>PubSub: deliver envelope (all instances)
PubSub->>Registry: dispatch -> dispatchEventToLocal*()
Registry->>Socket: socket.send(JSON.stringify(event))
else auth failure
Socket-->>Client: send error / close
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
There was a problem hiding this comment.
Actionable comments posted: 7
🧹 Nitpick comments (11)
apps/api/src/ws/socket.test.ts (1)
57-62: Good: isolate pub/sub via mocksThese no-op publishers keep tests hermetic. Consider adding assertions that publish helpers are called in tests that exercise dispatch.
I can add a test that calls sendEventToWebsite/Visitor/Connection and asserts the corresponding publish function was invoked. Want me to draft it?
apps/api/src/ws/connection-registry.ts (3)
31-40: Harden send path and replace console with structured logger
- Add a readyState guard to avoid throwing on closed sockets.
- Avoid console.* per guidelines; inject/use a logger.
function sendEventToSocket( record: LocalConnectionRecord, serializedEvent: string ): void { try { - record.socket.send(serializedEvent); + // Bun ServerWebSocket: 1=open, 3=closed (align with your ws impl) + if ((record.socket as any).readyState === 1) { + record.socket.send(serializedEvent); + } } catch (error) { - console.error("[WebSocket] Failed to send event:", error); + // TODO: use shared logger, e.g., log.error(...) + // log.error({ err: error }, "WebSocket failed to send event"); } }Please confirm the exact readyState semantics for Bun’s ServerWebSocket in your runtime; adjust the numeric constant accordingly.
81-110: Website broadcast filters to user sessions onlyIntentional gate via userId is clear. Same console/logging note applies.
17-29: Exclude helper is fine; tiny nit on undefined returnReturning
undefinedimplicitly is fine; if you prefer explicitness:- if (!options?.exclude) { - return; - } + if (!options?.exclude) return undefined as undefined;apps/api/src/ws/realtime-pubsub.ts (3)
97-121: Be defensive when decoding incoming messageIf the SDK yields strings,
handleEnvelopeshould parse. Current code assumes typed envelopes.-function handleEnvelope(envelope: DispatchEnvelope | undefined): void { +function handleEnvelope(envelope: DispatchEnvelope | string | undefined): void { if (!envelope) { return; } - const dispatchers = dispatchersRef; + const parsed: DispatchEnvelope | undefined = + typeof envelope === "string" ? safeParse<DispatchEnvelope>(envelope) : envelope; + if (!parsed) return; + const dispatchers = dispatchersRef; if (!dispatchers) { return; } - const { event, target } = envelope; + const { event, target } = parsed;
23-26: Instance id generation: ensure portability
process.pidmay be undefined in some non-Node/Bun targets. If this module ever runs outside Node/Bun, gate it.-const instanceId = `api-${process.pid ?? "pid"}-${Math.random() +const instanceId = `api-${(globalThis as any).process?.pid ?? "pid"}-${Math.random() .toString(36) .slice(2, 10)}`;
200-225: Redundant custom retry around publishUpstash TS SDK already retries network-failing requests 5× with exponential backoff (Math.exp(retryCount) * 50 ms). The custom retry in publishEnvelope duplicates that behavior and can cause compounded retries/bursts — remove the local retry block in apps/api/src/ws/realtime-pubsub.ts (lines 200–225), or disable/adjust the SDK's retry options in the Redis constructor and centralize backoff logic.
apps/api/src/ws/socket.ts (4)
108-111: Reuse WSContext type instead of redefining SocketContext.Avoid type drift; import WSContext (or re-export it) and narrow as needed.
+import type { WSContext } from "@api/utils/websocket-connection"; -type SocketContext = { - raw?: RawSocket; - send: (data: string) => void; -}; +type SocketContext = Pick<WSContext, "raw" | "send">;Confirm WSContext is exported from @api/utils/websocket-connection; if not, re-export it there.
296-313: Harden Authorization parsing.Avoid split(" ") and only accept Bearer tokens to reduce ambiguity.
-const authHeader = c.req.header("Authorization"); -let privateKey = authHeader?.split(" ")[1]; +const authHeader = c.req.header("Authorization") ?? ""; +const bearer = /^Bearer\s+(.+)$/.exec(authHeader); +let privateKey = bearer?.[1];
55-62: Remove or export ConnectionData (unused).It appears unused; drop it or export only if part of the public API.
-export type ConnectionData = { - connectionId: string; - userId?: string; - connectedAt: number; - apiKey?: ApiKeyWithWebsiteAndOrganization; - organizationId?: string; - websiteId?: string; -}; +// (remove if unused)
792-797: Don’t label handler failures as “Invalid message format”.Route errors aren’t parsing errors; return a generic internal error to clients and log details server-side.
-} catch (error) { - console.error("[WebSocket] Error processing message:", error); - sendInvalidFormatResponse(ws, error); -} +} catch (error) { + console.error("[WebSocket] Error processing message:", error); + sendError(ws, { error: "Internal error", message: "Failed to process event." }); +}
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
apps/api/src/ws/connection-registry.ts(1 hunks)apps/api/src/ws/realtime-pubsub.ts(1 hunks)apps/api/src/ws/router.test.ts(1 hunks)apps/api/src/ws/router.ts(1 hunks)apps/api/src/ws/socket.test.ts(2 hunks)apps/api/src/ws/socket.ts(2 hunks)
🧰 Additional context used
📓 Path-based instructions (9)
**/*.{ts,tsx}
📄 CodeRabbit inference engine (.cursor/rules/global-rules.mdc)
**/*.{ts,tsx}: Manage shared component state via React Context
Implement full TypeScript support for all public APIs
Provide proper type definitions for all props and refs
Use appropriate generic constraints where necessary
Export type definitions for the public API
Avoid TypeScript enums; prefer const objects with 'as const' or Object.freeze
Provide clear, descriptive runtime errors for missing context/props and misuse
Expose dedicated hooks for context access that throw helpful errors when context is missing
Add explicit error messages for: outside-context usage, missing required props, invalid prop combinations, and validation failures
Document each component with JSDoc: purpose, props (required/optional), context requirements, example usage, pitfalls
State management: use React Context for global state, perform correct state updates, and handle side effects appropriately
Validation: implement runtime checks and validate prop combinations with clear errors
Create context with createContext<ContextType|null>(null) and a useContext hook that throws a clear error when missing
**/*.{ts,tsx}: Don't use primitive type aliases or misleading types
Don't use empty type parameters in type aliases and interfaces
Don't use any or unknown as type constraints
Don't have unused function parameters
Don't have unused private class members
Don't return a value from a function that has a 'void' return type
Don't use the TypeScript directive @ts-ignore
Ensure switch-case statements are exhaustive (especially with unions)
Don't use TypeScript enums
Don't add type annotations to variables, parameters, and class properties initialized with literal expressions
Don't use TypeScript namespaces
Don't use non-null assertions with the ! postfix operator
Don't use parameter properties in class constructors
Don't use user-defined types (where discouraged by policy)
Use as const instead of literal types and type annotations
Use either T[] or Array consistently
Use consistent accessibili...
Files:
apps/api/src/ws/router.tsapps/api/src/ws/router.test.tsapps/api/src/ws/connection-registry.tsapps/api/src/ws/socket.test.tsapps/api/src/ws/socket.tsapps/api/src/ws/realtime-pubsub.ts
**/*.{js,jsx,ts,tsx}
📄 CodeRabbit inference engine (.cursor/rules/ultracite.mdc)
**/*.{js,jsx,ts,tsx}: Don't use theaccessKeyattribute on any HTML element
Use correct ISO language/country codes for the lang attribute
Don't hardcode sensitive data like API keys and tokens
Use === and !==
Files:
apps/api/src/ws/router.tsapps/api/src/ws/router.test.tsapps/api/src/ws/connection-registry.tsapps/api/src/ws/socket.test.tsapps/api/src/ws/socket.tsapps/api/src/ws/realtime-pubsub.ts
**/*.{js,ts}
📄 CodeRabbit inference engine (.cursor/rules/ultracite.mdc)
**/*.{js,ts}: Don't use consecutive spaces in regular expression literals
Don't use the arguments object
Don't use the comma operator
Don't use unnecessary boolean casts
Don't use unnecessary callbacks with flatMap
Prefer for...of statements instead of Array.forEach
Don't use this and super in static contexts
Don't use unnecessary catch clauses
Don't use unnecessary constructors
Don't use unnecessary continue statements
Don't export empty modules that don't change anything
Don't use unnecessary escape sequences in regular expression literals
Don't use unnecessary labels
Don't use unnecessary nested block statements
Don't rename imports, exports, and destructured assignments to the same name
Don't use unnecessary string or template literal concatenation
Don't use String.raw in template literals when there are no escape sequences
Don't use useless case statements in switch statements
Avoid ternary operators when simpler alternatives exist
Don't use useless this aliasing
Don't initialize variables to undefined
Don't use void operators
Use Date.now() to get milliseconds since the Unix Epoch
Prefer .flatMap() over map().flat() when possible
Use literal property access instead of computed property access
Don't use parseInt() or Number.parseInt() when numeric literals suffice (binary/octal/hex)
Use concise optional chaining instead of chained logical expressions
Prefer regular expression literals over the RegExp constructor when possible
Don't use non–base-10 or underscore-separated number literal object member names
Remove redundant terms from logical expressions
Prefer while loops over for loops when no initializer/update is needed
Don't reassign const variables
Don't use constant expressions in conditions
Don't use Math.min/Math.max to clamp values when result is constant
Don't use empty character classes in regular expression literals
Don't use empty destructuring patterns
Don't call global object properties as functions
Don't declare functions and vars that are acce...
Files:
apps/api/src/ws/router.tsapps/api/src/ws/router.test.tsapps/api/src/ws/connection-registry.tsapps/api/src/ws/socket.test.tsapps/api/src/ws/socket.tsapps/api/src/ws/realtime-pubsub.ts
**/*.{js,ts,tsx}
📄 CodeRabbit inference engine (.cursor/rules/ultracite.mdc)
**/*.{js,ts,tsx}: Don't create classes that only have static members
Don't return a value from a constructor
Don't use super() incorrectly inside classes; ensure super() is called in derived constructors
Ensure super() is called exactly once on every code path in a class constructor before this is accessed if the class has a superclass
Place getters and setters for the same property next to each other in class and object definitions
Ensure JSDoc comment lines start with a single asterisk, except for the first one
Don't use duplicate class members
Files:
apps/api/src/ws/router.tsapps/api/src/ws/router.test.tsapps/api/src/ws/connection-registry.tsapps/api/src/ws/socket.test.tsapps/api/src/ws/socket.tsapps/api/src/ws/realtime-pubsub.ts
**/*.{js,tsx,ts,jsx}
📄 CodeRabbit inference engine (.cursor/rules/ultracite.mdc)
Use arrow functions instead of function expressions
Files:
apps/api/src/ws/router.tsapps/api/src/ws/router.test.tsapps/api/src/ws/connection-registry.tsapps/api/src/ws/socket.test.tsapps/api/src/ws/socket.tsapps/api/src/ws/realtime-pubsub.ts
**/*.{js,ts,jsx,tsx}
📄 CodeRabbit inference engine (.cursor/rules/ultracite.mdc)
**/*.{js,ts,jsx,tsx}: Prevent duplicate polyfills from Polyfill.io
Don't use irregular whitespace characters
Files:
apps/api/src/ws/router.tsapps/api/src/ws/router.test.tsapps/api/src/ws/connection-registry.tsapps/api/src/ws/socket.test.tsapps/api/src/ws/socket.tsapps/api/src/ws/realtime-pubsub.ts
**/*.{js,ts,ts,tsx}
📄 CodeRabbit inference engine (.cursor/rules/ultracite.mdc)
Don't redeclare variables, functions, classes, and types in the same scope
Files:
apps/api/src/ws/router.tsapps/api/src/ws/router.test.tsapps/api/src/ws/connection-registry.tsapps/api/src/ws/socket.test.tsapps/api/src/ws/socket.tsapps/api/src/ws/realtime-pubsub.ts
**/*.{test.ts,test.tsx,spec.ts,spec.tsx,test.js,test.jsx,spec.js,spec.jsx}
📄 CodeRabbit inference engine (.cursor/rules/global-rules.mdc)
Include tests: unit tests for components, integration tests for composed components, accessibility tests, and error boundary tests
Files:
apps/api/src/ws/router.test.tsapps/api/src/ws/socket.test.ts
**/*.{test,spec}.{js,jsx,ts,tsx}
📄 CodeRabbit inference engine (.cursor/rules/ultracite.mdc)
**/*.{test,spec}.{js,jsx,ts,tsx}: Don't nest describe() blocks too deeply in test files
Don't use callbacks in asynchronous tests and hooks
Don't have duplicate hooks in describe blocks
Don't use export or module.exports in test files
Don't use focused tests
Ensure the assertion function (e.g., expect) is placed inside an it() function call
Don't use disabled tests
Files:
apps/api/src/ws/router.test.tsapps/api/src/ws/socket.test.ts
🧬 Code graph analysis (4)
apps/api/src/ws/connection-registry.ts (1)
packages/types/src/realtime-events.ts (1)
RealtimeEvent(60-64)
apps/api/src/ws/socket.test.ts (1)
packages/types/src/realtime-events.ts (1)
RealtimeEvent(60-64)
apps/api/src/ws/socket.ts (7)
apps/api/src/db/queries/session.ts (3)
ApiKeyWithWebsiteAndOrganization(12-15)normalizeSessionToken(19-36)resolveSession(38-87)apps/api/src/ws/realtime-pubsub.ts (4)
initializeRealtimePubSub(227-231)publishToConnection(233-244)publishToVisitor(246-263)publishToWebsite(265-282)apps/api/src/ws/connection-registry.ts (6)
dispatchEventToLocalConnection(42-53)dispatchEventToLocalVisitor(55-79)dispatchEventToLocalWebsite(81-110)RawSocket(5-5)LocalConnectionRecord(7-13)localConnections(15-15)apps/api/src/ws/router.ts (2)
EventContext(183-183)routeEvent(160-178)apps/api/src/utils/websocket-connection.ts (8)
getConnectionIdFromSocket(134-138)sendError(140-142)handleAuthenticationFailure(27-37)handleIdentificationFailure(39-49)storeConnectionId(51-56)sendConnectionEstablishedMessage(58-76)createConnectionEvent(78-113)updatePresenceIfNeeded(115-132)packages/types/src/realtime-events.ts (3)
RealtimeEvent(60-64)isValidEventType(84-86)validateRealtimeEvent(73-79)apps/api/src/lib/auth-validation.ts (3)
AuthValidationOptions(23-27)AuthValidationError(34-42)performAuthentication(382-431)
apps/api/src/ws/realtime-pubsub.ts (2)
apps/api/src/env.ts (1)
env(27-57)packages/types/src/realtime-events.ts (3)
RealtimeEvent(60-64)isValidEventType(84-86)validateRealtimeEvent(73-79)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Cursor Bugbot
🔇 Additional comments (10)
apps/api/src/ws/router.ts (1)
180-187: Type export reorder looks goodPurely cosmetic; no behavioral change.
apps/api/src/ws/router.test.ts (1)
4-4: Import order change is fineNo runtime effect.
apps/api/src/ws/socket.test.ts (1)
6-6: Tuple-array typing tweak is correct
[RealtimeEvent, EventContext][]is clear and precise.apps/api/src/ws/connection-registry.ts (2)
42-53: Direct-to-connection dispatch is fineLooks correct and side‑effect free if connection is missing.
55-79: Visitor broadcast: good use of pre-serialization and exclusionEfficient loop; honors exclude.
apps/api/src/ws/realtime-pubsub.ts (2)
83-96: Unsubscribe method name may differYou call
unsubscribe(). Some SDKs exposeclose()/abort()or return a disposer. Verify and align.[suggest_minor_issue]
- await activeSubscriber.unsubscribe(); + // If API differs, prefer the actual disposer: + await (activeSubscriber as any).unsubscribe?.() ?? (activeSubscriber as any).close?.();
134-159: Good: payload validation per type schemaValidating type and payload before dispatching is solid.
apps/api/src/ws/socket.ts (3)
70-80: Publish helpers look good.Fire-and-forget with error logging is appropriate for dispatcher contracts.
Also applies to: 82-93, 95-106
783-791: No handler reads context.ws — setting ws: undefined is safe.Searched the repo for
context.ws/ctx.ws(no matches); router handlers only use sendToConnection / sendToVisitor / sendToWebsite and metadata. EventContext is defined at apps/api/src/ws/router.ts:27; contexts are created withws: undefinedin apps/api/src/ws/socket.ts and apps/api/src/utils/websocket-message.ts.
64-68: No action required — Pub/Sub init already guarded.startSubscription() short-circuits when a subscriber exists (
if (subscriber) return) so repeated calls to initializeRealtimePubSub (which only sets dispatchersRef then calls startSubscription) will not create duplicate Redis subscriptions; only caller found is apps/api/src/ws/socket.ts.
| try { | ||
| const nextSubscriber = | ||
| subscriberClient.subscribe<DispatchEnvelope>(REALTIME_CHANNEL); | ||
| nextSubscriber.on("message", ({ message }) => { | ||
| handleEnvelope(message); | ||
| }); | ||
| nextSubscriber.on("error", (error) => { | ||
| console.error("[RealtimePubSub] Subscription error", error); | ||
| teardownSubscriber() | ||
| .catch((teardownError) => { | ||
| console.error( | ||
| "[RealtimePubSub] Failed to teardown subscriber after error", | ||
| teardownError | ||
| ); | ||
| }) | ||
| .finally(() => { | ||
| scheduleReconnect(); | ||
| }); | ||
| }); | ||
|
|
||
| subscriber = nextSubscriber; | ||
| } catch (error) { | ||
| console.error( | ||
| "[RealtimePubSub] Failed to subscribe to realtime channel", | ||
| error | ||
| ); | ||
| scheduleReconnect(); | ||
| } | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
Likely incorrect subscribe API usage (shape and sync/async assumptions)
Code assumes an EventEmitter-style .on("message"|"error") and a sync subscribe(); Upstash’s docs emphasize REST/SSE for SUBSCRIBE, and examples often use ioredis for subscriptions. Verify the actual TS SDK API; you may need await redis.subscribe({ channel, onMessage }), and to JSON.parse payloads.
- try {
- const nextSubscriber =
- subscriberClient.subscribe<DispatchEnvelope>(REALTIME_CHANNEL);
- nextSubscriber.on("message", ({ message }) => {
- handleEnvelope(message);
- });
- nextSubscriber.on("error", (error) => {
- console.error("[RealtimePubSub] Subscription error", error);
- teardownSubscriber()
- .catch((teardownError) => {
- console.error(
- "[RealtimePubSub] Failed to teardown subscriber after error",
- teardownError
- );
- })
- .finally(() => {
- scheduleReconnect();
- });
- });
- subscriber = nextSubscriber;
- } catch (error) {
+ try {
+ // Adjust to the actual SDK signature; many installs require `await`.
+ const sub = await (subscriberClient as any).subscribe(REALTIME_CHANNEL, (raw: unknown) => {
+ const msg = typeof raw === "string" ? safeParse<DispatchEnvelope>(raw) : (raw as DispatchEnvelope | undefined);
+ handleEnvelope(msg);
+ });
+ // If the SDK provides an error callback, wire it here:
+ // sub.onError?.((error: unknown) => { ... });
+ subscriber = sub;
+ } catch (error) {
console.error(
"[RealtimePubSub] Failed to subscribe to realtime channel",
error
);
scheduleReconnect();
}Add a small helper near the top:
function safeParse<T>(s: string): T | undefined {
try { return JSON.parse(s) as T; } catch { return undefined; }
}Citations for context on publish/subscribe and SSE: Upstash TS publish docs and REST SUBSCRIBE docs. (upstash.com)
🌐 Web query:
What is the correct TypeScript subscribe API for @upstash/redis Pub/Sub (callback signature, return type, and error handling) as of September 2025?
💡 Result:
Short answer: there is no long‑running, callback‑style TypeScript subscribe() in @upstash/redis — the HTTP/REST SDK exposes Redis pub/sub commands (PUBLISH and the PUBSUB info commands) but not a persistent server‑push SUBSCRIBE websocket/tcp stream with a callback signature. Use PUBLISH/PUBSUB via the SDK or use a standard TCP Redis client (ioredis/node‑redis) for real time SUBSCRIBE/PSUBSCRIBE callbacks.
Key citations:
- Upstash REST API feature table: Pub/Sub — only PUBLISH and PUBSUB are supported via REST. [1]
- SDK docs / changelog: the @upstash/redis SDK is an HTTP/REST client; errors are thrown by commands (not returned as {data,error}) — i.e., follow normal async/await try/catch error handling. [2][3]
Implication / recommended usage (concise):
- If you need a callback signature like (channel: string, message: string) => void and a return/unsubscribe function, use a TCP Redis client (ioredis or node‑redis) and its subscribe API (it provides subscribe(channel, cb) or emitter events and returns an unsubscribe). Upstash’s HTTP SDK only supports publishing and PUBSUB inspection; handle errors with try/catch around async calls. [1][2]
References:
- Upstash REST API / Features (Pub/Sub support notes). [1]
- Upstash TypeScript SDK docs (overview / get started / advanced — SDK is HTTP based; errors thrown). [2][3]
If you want, I can show (a) example code using ioredis subscribe(callback) or (b) how to publish / inspect pubsub with @upstash/redis and handle errors via try/catch.
Fix pub/sub: @upstash/redis has no long‑running subscribe — change implementation
apps/api/src/ws/realtime-pubsub.ts (lines 170–198): the code assumes an EventEmitter-style, long‑running subscribe(...) with .on("message"|"error"). @upstash/redis is an HTTP/REST client (no persistent SUBSCRIBE callback) — this will fail if subscriberClient is an Upstash client.
- If you need real‑time SUBSCRIBE, replace subscriberClient with a TCP Redis client (ioredis or node‑redis) and use its subscribe/
client.on('message', ...)API; wire unsubscribe/close and error handlers accordingly. - If you must keep @upstash/redis, stop relying on .on('message') and instead use PUBLISH + PUBSUB inspection (or add a separate SSE/WebSocket layer or polling) for real‑time delivery.
- Parse payloads safely (JSON.parse with try/catch or a safeParse helper) before calling handleEnvelope and validate the parsed object.
- Ensure teardown/reconnect logic is triggered from the chosen client's real error/unsubscribe callbacks and that async subscribe calls are awaited and wrapped in try/catch.
🤖 Prompt for AI Agents
In apps/api/src/ws/realtime-pubsub.ts around lines 170–198 the code incorrectly
treats subscriberClient as an EventEmitter long‑running subscriber (using
.on("message"| "error")) which does not work with @upstash/redis HTTP client;
replace or adapt the implementation: either swap subscriberClient for a TCP
Redis client (ioredis or node‑redis) and use its subscribe/await subscribe call
plus client.on('message') and client.on('error') handlers (ensure
unsubscribe/quit teardown and reconnection logic are wired to those events), or
if you must keep @upstash/redis, remove the .on handlers and implement an
alternative (polling, SSE/websocket, or inspect PUBSUB via REST) for real‑time
delivery; additionally, always parse incoming payloads with a safe JSON.parse
try/catch (or safeParse helper) and validate the parsed DispatchEnvelope before
calling handleEnvelope, and ensure all async subscribe/teardown calls are
awaited and wrapped in try/catch so teardown/reconnect is triggered from real
error/unsubscribe outcomes.
| function parseRealtimeEventMessage( | ||
| rawMessage: unknown, | ||
| ws: SocketContext | ||
| ): RealtimeEvent | null { | ||
| let message: { data: unknown; type?: string }; | ||
|
|
||
| try { | ||
| message = JSON.parse(String(rawMessage)); | ||
| } catch (error) { | ||
| console.error("[WebSocket] Error parsing message:", error); | ||
| sendInvalidFormatResponse(ws, error); | ||
| return null; | ||
| } | ||
|
|
||
| console.log("[WebSocket] Dispatching website event", { | ||
| websiteId, | ||
| connectionId, | ||
| eventType: event.type, | ||
| if (!(message.type && isValidEventType(message.type))) { | ||
| console.error(`[WebSocket] Invalid event type: ${message.type}`); | ||
| sendError(ws, { | ||
| error: "Invalid event type", | ||
| message: `Invalid event type: ${message.type}`, | ||
| }); | ||
| sendEventToSocket(connection, serializedEvent); | ||
| return null; | ||
| } | ||
| }; | ||
|
|
||
| export const sendEventToConnection = dispatchToConnection; | ||
| export const sendEventToVisitor = dispatchToVisitor; | ||
| export const sendEventToWebsite = dispatchToWebsite; | ||
| const validatedData = validateRealtimeEvent(message.type, message.data); | ||
|
|
||
| return { | ||
| type: message.type, | ||
| data: validatedData, | ||
| timestamp: Date.now(), | ||
| }; | ||
| } |
There was a problem hiding this comment.
Handle binary frames and catch schema errors to avoid crashes.
String(rawMessage) corrupts Uint8Array/ArrayBuffer payloads; validateRealtimeEvent can throw and currently propagates unhandled.
-function parseRealtimeEventMessage(
+function parseRealtimeEventMessage(
rawMessage: unknown,
ws: SocketContext
): RealtimeEvent | null {
- let message: { data: unknown; type?: string };
-
- try {
- message = JSON.parse(String(rawMessage));
- } catch (error) {
- console.error("[WebSocket] Error parsing message:", error);
- sendInvalidFormatResponse(ws, error);
- return null;
- }
+ let jsonStr: string;
+ if (typeof rawMessage === "string") {
+ jsonStr = rawMessage;
+ } else if (rawMessage instanceof Uint8Array) {
+ jsonStr = new TextDecoder().decode(rawMessage);
+ } else if (rawMessage instanceof ArrayBuffer) {
+ jsonStr = new TextDecoder().decode(new Uint8Array(rawMessage));
+ } else {
+ sendInvalidFormatResponse(ws, new Error("Unsupported message type"));
+ return null;
+ }
+
+ let message: { data: unknown; type?: string };
+ try {
+ message = JSON.parse(jsonStr);
+ } catch (error) {
+ console.error("[WebSocket] Error parsing message:", error);
+ sendInvalidFormatResponse(ws, error);
+ return null;
+ }
if (!(message.type && isValidEventType(message.type))) {
console.error(`[WebSocket] Invalid event type: ${message.type}`);
sendError(ws, {
error: "Invalid event type",
message: `Invalid event type: ${message.type}`,
});
return null;
}
- const validatedData = validateRealtimeEvent(message.type, message.data);
+ let validatedData;
+ try {
+ validatedData = validateRealtimeEvent(message.type, message.data);
+ } catch (error) {
+ console.error("[WebSocket] Invalid event data:", error);
+ sendInvalidFormatResponse(ws, error);
+ return null;
+ }
return {
type: message.type,
data: validatedData,
timestamp: Date.now(),
};
}Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In apps/api/src/ws/socket.ts around lines 166 to 196, the parser currently uses
String(rawMessage) which corrupts binary frames (Uint8Array/ArrayBuffer) and
lets validateRealtimeEvent exceptions bubble up; update the function to detect
ArrayBuffer/Uint8Array and decode them with TextDecoder (fallback to String for
other types), wrap the JSON.parse and the validateRealtimeEvent call each in
try/catch so schema/validation errors are caught, and on any parse/validation
error send the appropriate sendInvalidFormatResponse or sendError to the client
and return null rather than throwing.
| return `conn_${Date.now()}_${Math.random().toString(36).substring(2, 9)}`; | ||
| } |
There was a problem hiding this comment.
Use cryptographically-strong connection IDs.
Math.random() is weak and increases collision risk across instances.
-export function generateConnectionId(): string {
- return `conn_${Date.now()}_${Math.random().toString(36).substring(2, 9)}`;
-}
+export function generateConnectionId(): string {
+ return `conn_${crypto.randomUUID()}`;
+}Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In apps/api/src/ws/socket.ts around lines 205 to 206, the connection ID
generator uses Math.random() which is not cryptographically strong; replace it
with a secure generator (e.g., Node's crypto.randomUUID() or crypto.randomBytes)
to produce collision-resistant IDs, and update the return to combine Date.now()
with the secure value (or just use the UUID alone) so IDs are unpredictable and
safe across instances.
| console.log("[WebSocket Auth] Extracted credentials:", { | ||
| hasPrivateKey: !!privateKey, | ||
| hasPublicKey: !!publicKey, | ||
| publicKey: publicKey ? `${publicKey.substring(0, 10)}...` : null, | ||
| origin, | ||
| secWebSocketOrigin, | ||
| referer, | ||
| actualOrigin, | ||
| visitorId: visitorId ? `${visitorId.substring(0, 8)}...` : null, | ||
| }); | ||
| } |
There was a problem hiding this comment.
🛠️ Refactor suggestion
Replace console. with a central logger (applies throughout this file).*
Guidelines prohibit console; use a logger with levels and sampling. Instances at e.g. Lines 219-220, 233-236, 333-343, 392-394, 459-466, 474-485, 513-519, 569-571, 586-587, 677-679, 720-722, 742-744, 746-753, 795-797, 807-809, 820-821.
-import { console } from "global";
+import { logger } from "@api/lib/logger"; // example
-console.error("[WebSocket] ...", err);
+logger.error({ err }, "[WebSocket] ...");Committable suggestion skipped: line range outside the PR's diff.
| function extractFromRequest(c: Context): { | ||
| protocol: string | undefined; | ||
| hostname: string | undefined; | ||
| protocol: string | undefined; | ||
| hostname: string | undefined; | ||
| } { | ||
| const hostHeader = c.req.header("Host"); | ||
| if (!hostHeader) { | ||
| return { protocol: undefined, hostname: undefined }; | ||
| } | ||
| const hostHeader = c.req.header("Host"); | ||
| if (!hostHeader) { | ||
| return { protocol: undefined, hostname: undefined }; | ||
| } | ||
|
|
||
| const hostname = hostHeader.split(":")[0]; | ||
| const isSecure = c.req.url.startsWith("wss://"); | ||
| const protocol = isSecure ? "wss:" : "ws:"; | ||
| const hostname = hostHeader.split(":")[0]; | ||
| const isSecure = c.req.url.startsWith("wss://"); | ||
| const protocol = isSecure ? "wss:" : "ws:"; | ||
|
|
||
| return { protocol, hostname }; | ||
| return { protocol, hostname }; | ||
| } |
There was a problem hiding this comment.
Fix protocol detection when Origin is absent (proxies/CDN).
c.req.url won’t start with wss://; prefer X-Forwarded-Proto/https to infer ws/wss.
-const hostname = hostHeader.split(":")[0];
-const isSecure = c.req.url.startsWith("wss://");
-const protocol = isSecure ? "wss:" : "ws:";
+const hostname = hostHeader.split(":")[0];
+const xfProto = c.req.header("X-Forwarded-Proto")?.toLowerCase();
+const isSecure = xfProto ? xfProto === "https" : c.req.url.startsWith("https://");
+const protocol = isSecure ? "wss:" : "ws:";🤖 Prompt for AI Agents
In apps/api/src/ws/socket.ts around lines 401 to 415, protocol detection
currently checks c.req.url for "wss://" which is incorrect for proxied/CDN
requests and when Origin is absent; update the function to prefer proxy headers
(e.g. the X-Forwarded-Proto header) and common indicators like "https" to decide
between "wss:" and "ws:" (falling back to a sensible default if the header is
missing), keep Host parsing the same, and ensure header lookups use the request
header accessor used elsewhere (e.g. c.req.header("X-Forwarded-Proto")).
| // Add visitorId to the result if authentication was successful | ||
| if (result) { | ||
| result.visitorId = visitorId; | ||
| if (websiteIdParam) { | ||
| result.websiteId = websiteIdParam; | ||
| } | ||
| logAuthSuccess(result); | ||
| } |
There was a problem hiding this comment.
Remove unvalidated websiteId override (privilege escalation).
Overwriting the authenticated websiteId from a query param allows cross-site spoofing. Drop the override (or validate it strictly against the authenticated key/session).
-// Add visitorId to the result if authentication was successful
if (result) {
result.visitorId = visitorId;
- if (websiteIdParam) {
- result.websiteId = websiteIdParam;
- }
logAuthSuccess(result);
}I can add a follow-up that validates websiteIdParam against apiKey.website.id or the user’s activeTeam before allowing overrides.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // Add visitorId to the result if authentication was successful | |
| if (result) { | |
| result.visitorId = visitorId; | |
| if (websiteIdParam) { | |
| result.websiteId = websiteIdParam; | |
| } | |
| logAuthSuccess(result); | |
| } | |
| if (result) { | |
| result.visitorId = visitorId; | |
| logAuthSuccess(result); | |
| } |
🤖 Prompt for AI Agents
In apps/api/src/ws/socket.ts around lines 574 to 581, the code unconditionally
overrides the authenticated websiteId with a query param (websiteIdParam), which
enables cross-site privilege escalation; remove the override so result.websiteId
remains the authenticated value, or if an override is required, validate
websiteIdParam explicitly against the authenticated context (e.g.,
apiKey.website.id or the user’s activeTeam membership) before assigning; ensure
visitorId is still set and that logAuthSuccess logs only the
authenticated/validated websiteId.
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (2)
apps/api/src/ws/realtime-pubsub.ts (2)
14-20: LGTM: Upstash client config is now valid (no non-existent keepAlive).Good fix removing the non-standard keepAlive flag; Upstash enables HTTP keep‑alive internally, and the SDK exposes agent configuration if you need to override defaults. (upstash.com)
171-173: Parse and validate incoming payloads; don’t assume typed objects.Redis PUB/SUB messages arrive as strings. Always JSON.parse with try/catch (or safeParse) before handleEnvelope to avoid runtime crashes and silent drops.
Apply:
- nextSubscriber.on("message", ({ message }) => { - handleEnvelope(message); - }); + // See startSubscription() proposed fix for safeParse usage.
🧹 Nitpick comments (6)
apps/api/src/ws/realtime-pubsub.ts (6)
81-93: Unsubscribe path likely incorrect for Upstash Subscriber.Calling activeSubscriber.unsubscribe() assumes an instance method. Depending on SDK version, you may need sub.unsubscribe(channel) or sub.close(). Confirm the concrete API and update teardown accordingly.
If your SDK exposes channel-scoped unsubscribe:
- await activeSubscriber.unsubscribe(); + await (activeSubscriber as any).unsubscribe?.(REALTIME_CHANNEL) ?? (activeSubscriber as any).close?.();Please verify against your exact @upstash/redis version’s subscriber API. (github.com)
70-79: Reconnect strategy: add exponential backoff with jitter.A fixed 1s loop can stampede your Redis endpoint during outages. Use capped exponential backoff + jitter.
Example:
-let reconnectTimer: ReturnType<typeof setTimeout> | null = null; +let reconnectTimer: ReturnType<typeof setTimeout> | null = null; +let reconnectAttempts = 0; -function scheduleReconnect(): void { +function scheduleReconnect(): void { if (reconnectTimer) return; - reconnectTimer = setTimeout(() => { + const base = 1000; + const delay = Math.min(30_000, Math.floor(base * 2 ** reconnectAttempts + Math.random() * 250)); + reconnectTimer = setTimeout(() => { reconnectTimer = null; - startSubscription(); + reconnectAttempts++; + void startSubscription(); }, 1000); }Reset reconnectAttempts to 0 on successful subscribe.
198-223: Retry loop: consider promise-based retry to preserve call-site flow and cancellation.setTimeout recursion detaches from the original publish caller and complicates shutdowns. Prefer a simple in-function await delay (with AbortSignal if available).
Sketch:
- const retryDelay = BASE_RETRY_DELAY_MS * 2 ** attempt; - setTimeout(() => { - publishEnvelope(envelope, attempt + 1).catch((retryError) => { - console.error("[RealtimePubSub] Failed to publish realtime event", retryError); - }); - }, retryDelay); + const retryDelay = BASE_RETRY_DELAY_MS * 2 ** attempt; + await new Promise((r) => setTimeout(r, retryDelay)); + return publishEnvelope(envelope, attempt + 1);
58-61: Replace console. with project logger per guidelines.*The codebase guidelines forbid console usage. Swap console.error for a structured logger (e.g., logger.error) and include contextual fields.
Example:
- console.error("[RealtimePubSub] Failed to publish realtime event", retryError); + logError("[RealtimePubSub] Failed to publish realtime event", retryError);Add a thin wrapper:
const logError = (...args: unknown[]) => { /* delegate to your logger */ };
225-229: Initialize does not guard against double calls.If initializeRealtimePubSub is called twice, you can end up with dangling dispatchersRef races. Guard or idempotently teardown/start.
Example:
export function initializeRealtimePubSub(dispatchers: LocalDispatchers): void { - dispatchersRef = dispatchers; - startSubscription(); + dispatchersRef = dispatchers; + void startSubscription(); }And ensure startSubscription handles already-connected state (it does). Consider exposing a shutdown helper for tests.
231-242: Publisher helpers: OK, but consider excluding undefined fields for leaner payloads.Small tidy: omit exclude when empty to keep envelopes compact.
Also applies to: 244-261, 263-280
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
apps/api/src/ws/realtime-pubsub.ts(1 hunks)
🧰 Additional context used
📓 Path-based instructions (7)
**/*.{ts,tsx}
📄 CodeRabbit inference engine (.cursor/rules/global-rules.mdc)
**/*.{ts,tsx}: Manage shared component state via React Context
Implement full TypeScript support for all public APIs
Provide proper type definitions for all props and refs
Use appropriate generic constraints where necessary
Export type definitions for the public API
Avoid TypeScript enums; prefer const objects with 'as const' or Object.freeze
Provide clear, descriptive runtime errors for missing context/props and misuse
Expose dedicated hooks for context access that throw helpful errors when context is missing
Add explicit error messages for: outside-context usage, missing required props, invalid prop combinations, and validation failures
Document each component with JSDoc: purpose, props (required/optional), context requirements, example usage, pitfalls
State management: use React Context for global state, perform correct state updates, and handle side effects appropriately
Validation: implement runtime checks and validate prop combinations with clear errors
Create context with createContext<ContextType|null>(null) and a useContext hook that throws a clear error when missing
**/*.{ts,tsx}: Don't use primitive type aliases or misleading types
Don't use empty type parameters in type aliases and interfaces
Don't use any or unknown as type constraints
Don't have unused function parameters
Don't have unused private class members
Don't return a value from a function that has a 'void' return type
Don't use the TypeScript directive @ts-ignore
Ensure switch-case statements are exhaustive (especially with unions)
Don't use TypeScript enums
Don't add type annotations to variables, parameters, and class properties initialized with literal expressions
Don't use TypeScript namespaces
Don't use non-null assertions with the ! postfix operator
Don't use parameter properties in class constructors
Don't use user-defined types (where discouraged by policy)
Use as const instead of literal types and type annotations
Use either T[] or Array consistently
Use consistent accessibili...
Files:
apps/api/src/ws/realtime-pubsub.ts
**/*.{js,jsx,ts,tsx}
📄 CodeRabbit inference engine (.cursor/rules/ultracite.mdc)
**/*.{js,jsx,ts,tsx}: Don't use theaccessKeyattribute on any HTML element
Use correct ISO language/country codes for the lang attribute
Don't hardcode sensitive data like API keys and tokens
Use === and !==
Files:
apps/api/src/ws/realtime-pubsub.ts
**/*.{js,ts}
📄 CodeRabbit inference engine (.cursor/rules/ultracite.mdc)
**/*.{js,ts}: Don't use consecutive spaces in regular expression literals
Don't use the arguments object
Don't use the comma operator
Don't use unnecessary boolean casts
Don't use unnecessary callbacks with flatMap
Prefer for...of statements instead of Array.forEach
Don't use this and super in static contexts
Don't use unnecessary catch clauses
Don't use unnecessary constructors
Don't use unnecessary continue statements
Don't export empty modules that don't change anything
Don't use unnecessary escape sequences in regular expression literals
Don't use unnecessary labels
Don't use unnecessary nested block statements
Don't rename imports, exports, and destructured assignments to the same name
Don't use unnecessary string or template literal concatenation
Don't use String.raw in template literals when there are no escape sequences
Don't use useless case statements in switch statements
Avoid ternary operators when simpler alternatives exist
Don't use useless this aliasing
Don't initialize variables to undefined
Don't use void operators
Use Date.now() to get milliseconds since the Unix Epoch
Prefer .flatMap() over map().flat() when possible
Use literal property access instead of computed property access
Don't use parseInt() or Number.parseInt() when numeric literals suffice (binary/octal/hex)
Use concise optional chaining instead of chained logical expressions
Prefer regular expression literals over the RegExp constructor when possible
Don't use non–base-10 or underscore-separated number literal object member names
Remove redundant terms from logical expressions
Prefer while loops over for loops when no initializer/update is needed
Don't reassign const variables
Don't use constant expressions in conditions
Don't use Math.min/Math.max to clamp values when result is constant
Don't use empty character classes in regular expression literals
Don't use empty destructuring patterns
Don't call global object properties as functions
Don't declare functions and vars that are acce...
Files:
apps/api/src/ws/realtime-pubsub.ts
**/*.{js,ts,tsx}
📄 CodeRabbit inference engine (.cursor/rules/ultracite.mdc)
**/*.{js,ts,tsx}: Don't create classes that only have static members
Don't return a value from a constructor
Don't use super() incorrectly inside classes; ensure super() is called in derived constructors
Ensure super() is called exactly once on every code path in a class constructor before this is accessed if the class has a superclass
Place getters and setters for the same property next to each other in class and object definitions
Ensure JSDoc comment lines start with a single asterisk, except for the first one
Don't use duplicate class members
Files:
apps/api/src/ws/realtime-pubsub.ts
**/*.{js,tsx,ts,jsx}
📄 CodeRabbit inference engine (.cursor/rules/ultracite.mdc)
Use arrow functions instead of function expressions
Files:
apps/api/src/ws/realtime-pubsub.ts
**/*.{js,ts,jsx,tsx}
📄 CodeRabbit inference engine (.cursor/rules/ultracite.mdc)
**/*.{js,ts,jsx,tsx}: Prevent duplicate polyfills from Polyfill.io
Don't use irregular whitespace characters
Files:
apps/api/src/ws/realtime-pubsub.ts
**/*.{js,ts,ts,tsx}
📄 CodeRabbit inference engine (.cursor/rules/ultracite.mdc)
Don't redeclare variables, functions, classes, and types in the same scope
Files:
apps/api/src/ws/realtime-pubsub.ts
🧬 Code graph analysis (1)
apps/api/src/ws/realtime-pubsub.ts (2)
apps/api/src/env.ts (1)
env(27-57)packages/types/src/realtime-events.ts (3)
RealtimeEvent(60-64)isValidEventType(84-86)validateRealtimeEvent(73-79)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Cursor Bugbot
🔇 Additional comments (1)
apps/api/src/ws/realtime-pubsub.ts (1)
95-157: Validation and dispatch flow looks solid.Good use of isValidEventType + validateRealtimeEvent and exhaustive switch guarding.
| type SubscriberInstance = ReturnType<typeof subscriberClient.subscribe>; | ||
|
|
There was a problem hiding this comment.
Type bug: SubscriberInstance should await the subscribe() return type.
subscribe() in @upstash/redis returns a Subscriber (often via Promise). Using ReturnType without Awaited makes teardownSubscriber’s unsubscribe() unsafe at compile/runtime.
Apply:
-type SubscriberInstance = ReturnType<typeof subscriberClient.subscribe>;
+type SubscriberInstance = Awaited<ReturnType<typeof subscriberClient.subscribe>>;📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| type SubscriberInstance = ReturnType<typeof subscriberClient.subscribe>; | |
| type SubscriberInstance = Awaited<ReturnType<typeof subscriberClient.subscribe>>; |
🤖 Prompt for AI Agents
In apps/api/src/ws/realtime-pubsub.ts around lines 25 to 26, the type alias
SubscriberInstance uses ReturnType<typeof subscriberClient.subscribe> which
doesn’t account for subscribe() returning a Promise; change the alias to await
the promise (e.g., use Awaited<ReturnType<typeof subscriberClient.subscribe>> or
wrap with PromiseResolve) so teardownSubscriber can safely call unsubscribe() at
compile and runtime, and update any related variable declarations to match the
awaited Subscriber type.
| function startSubscription(): void { | ||
| if (!dispatchersRef) { | ||
| return; | ||
| } | ||
|
|
||
| if (subscriber) { | ||
| return; | ||
| } | ||
|
|
||
| try { | ||
| const nextSubscriber = | ||
| subscriberClient.subscribe<DispatchEnvelope>(REALTIME_CHANNEL); | ||
| nextSubscriber.on("message", ({ message }) => { | ||
| handleEnvelope(message); | ||
| }); | ||
| nextSubscriber.on("error", (error) => { | ||
| console.error("[RealtimePubSub] Subscription error", error); | ||
| teardownSubscriber() | ||
| .catch((teardownError) => { | ||
| console.error( | ||
| "[RealtimePubSub] Failed to teardown subscriber after error", | ||
| teardownError | ||
| ); | ||
| }) | ||
| .finally(() => { | ||
| scheduleReconnect(); | ||
| }); | ||
| }); | ||
|
|
||
| subscriber = nextSubscriber; | ||
| } catch (error) { | ||
| console.error( | ||
| "[RealtimePubSub] Failed to subscribe to realtime channel", | ||
| error | ||
| ); | ||
| scheduleReconnect(); | ||
| } | ||
| } |
There was a problem hiding this comment.
Broken subscription wiring: wrong API shape and missing await/ready.
This treats the Upstash subscriber like an EventEmitter with .on("message"/"error") and calls subscribe() synchronously. The SDK exposes a Subscriber for REST/SSE SUBSCRIBE; you should await the subscription and (newer versions) await sub.ready before publishing/consuming. Also parse JSON payloads before dispatch.
Suggested minimal fix (pattern; adjust to your installed SDK version):
-function startSubscription(): void {
+async function startSubscription(): Promise<void> {
if (!dispatchersRef) {
return;
}
if (subscriber) {
return;
}
-
- try {
- const nextSubscriber =
- subscriberClient.subscribe<DispatchEnvelope>(REALTIME_CHANNEL);
- nextSubscriber.on("message", ({ message }) => {
- handleEnvelope(message);
- });
- nextSubscriber.on("error", (error) => {
- console.error("[RealtimePubSub] Subscription error", error);
- teardownSubscriber()
- .catch((teardownError) => {
- console.error(
- "[RealtimePubSub] Failed to teardown subscriber after error",
- teardownError
- );
- })
- .finally(() => {
- scheduleReconnect();
- });
- });
- subscriber = nextSubscriber;
- } catch (error) {
+ try {
+ // Upstash SUBSCRIBE runs over SSE; await the subscriber and its readiness signal when available.
+ const sub = await (subscriberClient as any).subscribe(REALTIME_CHANNEL, (raw: unknown) => {
+ const msg = typeof raw === "string" ? safeParse<DispatchEnvelope>(raw) : (raw as DispatchEnvelope | undefined);
+ handleEnvelope(msg);
+ });
+ await (sub?.ready ?? Promise.resolve());
+ // Prefer real client hooks if provided (e.g. sub.onError):
+ sub?.onError?.((error: unknown) => {
+ logError("[RealtimePubSub] Subscription error", error);
+ teardownSubscriber().finally(scheduleReconnect);
+ });
+ subscriber = sub;
+ } catch (error) {
- console.error(
+ logError(
"[RealtimePubSub] Failed to subscribe to realtime channel",
error
);
scheduleReconnect();
}
}Add near the top of this file:
function safeParse<T>(s: string): T | undefined {
try { return JSON.parse(s) as T; } catch { return undefined; }
}The SDK gained SUBSCRIBE via SSE (July 2024) and a Subscriber.ready convenience (PR Apr 15, 2025); aligning to that surface avoids race conditions and handler miswiring. (upstash.com)
Summary
Testing
https://chatgpt.com/codex/tasks/task_e_68cdbe673b0c832bae5a672b89926ecd
Summary by CodeRabbit
New Features
Improvements
Refactor
Tests