Skip to content
This repository was archived by the owner on Sep 1, 2024. It is now read-only.

Commit 2823ad4

Browse files
committed
fix(ws): subscription unsubscribing
1 parent 307705b commit 2823ad4

4 files changed

Lines changed: 13 additions & 4 deletions

File tree

packages/application/lib/events.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ export class EventManager<
105105
const subscription = subscriptions.get(eventKey)
106106
if (!subscription) return false
107107
await this.subManager.unsubscribe(subscription)
108-
subscription.emit('unsubscribe')
108+
subscription.end()
109109
subscriptions.delete(eventKey)
110110
}
111111

packages/application/lib/subscription.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ export class Subscription<E extends Event = Event> extends PassThrough {
1414
readonly unsubscribe: () => Promise<any>,
1515
) {
1616
super({ writableObjectMode: true, readableObjectMode: true })
17-
this.once('unsubscribe', () => this.end())
1817
}
1918
}
2019

packages/client-websockets/index.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -439,7 +439,8 @@ class WebsocketsClient<
439439
[KEY[MessageType.ServerUnsubscribe]](buffer: ArrayBuffer) {
440440
const [key] = JSON.parse(decodeText(buffer))
441441
const subscription = this.subscriptions.get(key)
442-
subscription?.unsubscribe()
442+
subscription?.emit('end')
443+
this.subscriptions.delete(key)
443444
}
444445
}
445446

packages/transport-websockets/lib/server.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -338,12 +338,21 @@ export class WebsocketsTransportServer extends BaseHttpTransportServer {
338338
},
339339
close: async (ws, code, message) => {
340340
const { id, container, streams, subscriptions } = ws.getUserData()
341+
this.logger.debug(
342+
'Close websocket [%s] with code [%s]: %s',
343+
id,
344+
code,
345+
decodeText(message),
346+
)
341347
this.sockets.delete(ws)
342348
this.application.connections.remove(id)
343-
for (const _streams of [streams.up, streams.down, subscriptions]) {
349+
for (const _streams of [streams.up, streams.down]) {
344350
for (const stream of _streams.values()) stream.destroy()
345351
_streams.clear()
346352
}
353+
for (const subscription of subscriptions.values()) {
354+
subscription.unsubscribe()
355+
}
347356
this.handleContainerDisposal(container)
348357
},
349358
})

0 commit comments

Comments
 (0)