Skip to content

Commit 9c50000

Browse files
committed
fix double handler subscription and pass tests
1 parent a9270b3 commit 9c50000

8 files changed

Lines changed: 848 additions & 1166 deletions

File tree

src/subscribe/client.ts

Lines changed: 88 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,20 @@
1-
// src/subscribe/client.ts (modified)
1+
// src/subscribe/client.ts (fixed with multiple handlers per channel)
22
import postgres from 'postgres';
33
import { SubscribeOptions } from '../types/core';
44

5+
type NotificationHandler<T> = (payload: T) => void | Promise<void>;
6+
7+
interface ChannelSubscription {
8+
listenRequest: postgres.ListenRequest;
9+
handlers: Set<NotificationHandler<any>>;
10+
}
11+
512
/**
613
* Client for subscribing to PostgreSQL notifications
714
*/
815
class SubscriptionClient<Client> {
916
private sql: postgres.Sql;
10-
private activeSubscriptions: Map<string, postgres.ListenRequest> = new Map();
17+
private activeChannels: Map<string, ChannelSubscription> = new Map();
1118

1219
/**
1320
* Creates a new SubscriptionClient instance
@@ -37,13 +44,8 @@ class SubscriptionClient<Client> {
3744
onError = (error) => console.error('Subscription error:', error)
3845
} = options;
3946

40-
// Check if already subscribed
41-
if (this.activeSubscriptions.has(channel)) {
42-
throw new Error(`Already subscribed to channel: ${channel}`);
43-
}
44-
45-
// Create the subscription
46-
const listenRequest = this.sql.listen(channel, async (payload) => {
47+
// Create a typed handler function
48+
const handler: NotificationHandler<T> = async (payload: any) => {
4749
try {
4850
// Parse the payload
4951
const parsedPayload = parser(payload);
@@ -58,31 +60,79 @@ class SubscriptionClient<Client> {
5860
} catch (error) {
5961
onError(error as Error, payload);
6062
}
61-
});
63+
};
64+
65+
// Check if channel already has a postgres LISTEN
66+
const existingSubscription = this.activeChannels.get(channel);
6267

63-
// Store the subscription
64-
this.activeSubscriptions.set(channel, listenRequest);
68+
if (existingSubscription) {
69+
// Channel already exists - just add this handler
70+
existingSubscription.handlers.add(handler);
71+
} else {
72+
// New channel - create postgres LISTEN and handler set
73+
const listenRequest = this.sql.listen(
74+
channel,
75+
async (payload: string) => {
76+
// Get current handlers for this channel
77+
const subscription = this.activeChannels.get(channel);
78+
if (subscription) {
79+
// Call all handlers for this channel
80+
const handlerPromises = Array.from(subscription.handlers).map((h) =>
81+
Promise.resolve(h(payload)).catch((error) =>
82+
onError(error as Error, payload)
83+
)
84+
);
85+
await Promise.allSettled(handlerPromises);
86+
}
87+
}
88+
);
89+
90+
// Store the channel subscription
91+
this.activeChannels.set(channel, {
92+
listenRequest,
93+
handlers: new Set([handler])
94+
});
95+
}
6596
}
6697

6798
/**
68-
* Unsubscribes from notifications on a specific channel
99+
* Unsubscribes a specific handler from notifications on a channel
69100
*
70101
* @param channel - The notification channel to unsubscribe from
102+
* @param handler - The specific handler to remove (optional - removes all if not provided)
71103
* @returns A promise that resolves when unsubscribed
72104
*/
73-
public async unsubscribe(channel: string): Promise<void> {
74-
const listenRequest = this.activeSubscriptions.get(channel);
105+
public async unsubscribe(
106+
channel: string,
107+
handler?: NotificationHandler<any>
108+
): Promise<void> {
109+
const subscription = this.activeChannels.get(channel);
110+
111+
if (!subscription) {
112+
console.warn(`Not subscribed to channel: ${channel}`);
113+
return;
114+
}
75115

76-
if (!listenRequest) {
77-
throw new Error(`Not subscribed to channel: ${channel}`);
116+
if (handler) {
117+
// Remove specific handler
118+
subscription.handlers.delete(handler);
119+
} else {
120+
// Remove all handlers for this channel
121+
subscription.handlers.clear();
78122
}
79123

80-
// Unlisten
81-
const meta = await listenRequest;
82-
await meta.unlisten();
124+
// If no handlers left, unlisten from postgres
125+
if (subscription.handlers.size === 0) {
126+
try {
127+
const meta = await subscription.listenRequest;
128+
await meta.unlisten();
129+
} catch (error) {
130+
console.warn(`Error unlistening from ${channel}:`, error);
131+
}
83132

84-
// Remove from active subscriptions
85-
this.activeSubscriptions.delete(channel);
133+
// Remove channel completely
134+
this.activeChannels.delete(channel);
135+
}
86136
}
87137

88138
/**
@@ -91,14 +141,29 @@ class SubscriptionClient<Client> {
91141
* @returns A promise that resolves when all unsubscriptions are complete
92142
*/
93143
public async unsubscribeAll(): Promise<void> {
94-
const unsubscribePromises = Array.from(this.activeSubscriptions.keys()).map(
144+
const unsubscribePromises = Array.from(this.activeChannels.keys()).map(
95145
async (channel) => {
96146
await this.unsubscribe(channel);
97147
}
98148
);
99149

100150
await Promise.all(unsubscribePromises);
101151
}
152+
153+
/**
154+
* Get debug info about active subscriptions
155+
*/
156+
public getActiveChannels(): string[] {
157+
return Array.from(this.activeChannels.keys());
158+
}
159+
160+
/**
161+
* Get handler count for a specific channel
162+
*/
163+
public getHandlerCount(channel: string): number {
164+
const subscription = this.activeChannels.get(channel);
165+
return subscription ? subscription.handlers.size : 0;
166+
}
102167
}
103168

104169
// Export the class

src/trigger/registry.ts

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -214,34 +214,45 @@ export class Registry<Client> {
214214
async startListening(): Promise<void> {
215215
if (this.isListening) return;
216216

217+
const subscribedChannels = new Set<string>();
218+
217219
// Listen to model channels
218220
for (const [, channelName] of this.modelChannels) {
219-
await this.subscriptionClient.subscribe(channelName, {
220-
onNotification: (payload: any) => {
221-
this.handleNotification(channelName, payload);
222-
}
223-
});
221+
if (!subscribedChannels.has(channelName)) {
222+
await this.subscriptionClient.subscribe(channelName, {
223+
onNotification: (payload: any) => {
224+
this.handleNotification(channelName, payload);
225+
}
226+
});
227+
subscribedChannels.add(channelName);
228+
}
224229
}
225230

226231
// Listen to custom channels
227232
for (const [channelName] of this.customChannels) {
228-
await this.subscriptionClient.subscribe(channelName, {
229-
onNotification: (payload: any) => {
230-
this.handleNotification(channelName, payload);
231-
}
232-
});
233+
if (!subscribedChannels.has(channelName)) {
234+
await this.subscriptionClient.subscribe(channelName, {
235+
onNotification: (payload: any) => {
236+
this.handleNotification(channelName, payload);
237+
}
238+
});
239+
subscribedChannels.add(channelName);
240+
}
233241
}
234242

235243
// Listen to custom trigger channels
236244
for (const [triggerName] of this.modelTriggers) {
237245
if (triggerName.includes('_custom_')) {
238246
const [, , triggerShortName] = triggerName.split('_');
239247
const channelName = `${triggerShortName}_events`;
240-
await this.subscriptionClient.subscribe(channelName, {
241-
onNotification: (payload: any) => {
242-
this.handleNotification(channelName, payload);
243-
}
244-
});
248+
if (!subscribedChannels.has(channelName)) {
249+
await this.subscriptionClient.subscribe(channelName, {
250+
onNotification: (payload: any) => {
251+
this.handleNotification(channelName, payload);
252+
}
253+
});
254+
subscribedChannels.add(channelName);
255+
}
245256
}
246257
}
247258

0 commit comments

Comments
 (0)