forked from cameri/nostream
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsubscribe-message-handler.ts
More file actions
134 lines (114 loc) · 5.25 KB
/
Copy pathsubscribe-message-handler.ts
File metadata and controls
134 lines (114 loc) · 5.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import { anyPass, equals, isNil, map, propSatisfies, uniqWith } from 'ramda'
// import { addAbortSignal } from 'stream'
import { pipeline } from 'stream/promises'
import {
createEndOfStoredEventsNoticeMessage,
createNoticeMessage,
createOutgoingEventMessage,
} from '../utils/messages'
import { IAbortable, IMessageHandler } from '../@types/message-handlers'
import { isEventMatchingFilter, isExpiredEvent, toNostrEvent } from '../utils/event'
import { streamEach, streamEnd, streamFilter, streamMap } from '../utils/stream'
import { SubscriptionFilter, SubscriptionId } from '../@types/subscription'
import { createLogger } from '../factories/logger-factory'
import { Event } from '../@types/event'
import { IEventRepository } from '../@types/repositories'
import { IWebSocketAdapter } from '../@types/adapters'
import { Settings } from '../@types/settings'
import { SubscribeMessage } from '../@types/messages'
import { WebSocketAdapterEvent } from '../constants/adapter'
const logger = createLogger('subscribe-message-handler')
export class SubscribeMessageHandler implements IMessageHandler, IAbortable {
//private readonly abortController: AbortController
public constructor(
private readonly webSocket: IWebSocketAdapter,
private readonly eventRepository: IEventRepository,
private readonly settings: () => Settings,
) {
//this.abortController = new AbortController()
}
public abort(): void {
//this.abortController.abort()
}
public async handleMessage(message: SubscribeMessage): Promise<void> {
const subscriptionId = message[1]
const filters = uniqWith(equals, message.slice(2)) as SubscriptionFilter[]
const reason = this.canSubscribe(subscriptionId, filters)
if (reason) {
logger('subscription %s with %o rejected: %s', subscriptionId, filters, reason)
this.webSocket.emit(WebSocketAdapterEvent.Message, createNoticeMessage(`Subscription rejected: ${reason}`))
return
}
this.webSocket.emit(WebSocketAdapterEvent.Subscribe, subscriptionId, filters)
await this.fetchAndSend(subscriptionId, filters)
}
private async fetchAndSend(subscriptionId: string, filters: SubscriptionFilter[]): Promise<void> {
logger('fetching events for subscription %s with filters %o', subscriptionId, filters)
const sendEvent = (event: Event) =>
this.webSocket.emit(WebSocketAdapterEvent.Message, createOutgoingEventMessage(subscriptionId, event))
const sendEOSE = () =>
this.webSocket.emit(WebSocketAdapterEvent.Message, createEndOfStoredEventsNoticeMessage(subscriptionId))
const isSubscribedToEvent = SubscribeMessageHandler.isClientSubscribedToEvent(filters)
const isTagUnexpired = (event: Event) => {
if (isExpiredEvent(event)) {
return false
}
return true
}
const findEvents = this.eventRepository.findByFilters(filters).stream()
// const abortableFindEvents = addAbortSignal(this.abortController.signal, findEvents)
try {
await pipeline(
findEvents,
streamFilter(propSatisfies(isNil, 'deleted_at')),
streamMap(toNostrEvent),
streamFilter(isTagUnexpired),
streamFilter(isSubscribedToEvent),
streamEach(sendEvent),
streamEnd(sendEOSE),
)
} catch (error) {
if (error instanceof Error && error.name === 'AbortError') {
logger('subscription %s aborted: %o', subscriptionId, error)
findEvents.destroy()
} else {
logger('error streaming events: %o', error)
}
throw error
}
}
private static isClientSubscribedToEvent(filters: SubscriptionFilter[]): (event: Event) => boolean {
return anyPass(map(isEventMatchingFilter)(filters))
}
private canSubscribe(subscriptionId: SubscriptionId, filters: SubscriptionFilter[]): string | undefined {
const subscriptions = this.webSocket.getSubscriptions()
const existingSubscription = subscriptions.get(subscriptionId)
const subscriptionLimits = this.settings().limits?.client?.subscription
if (existingSubscription?.length && equals(filters, existingSubscription)) {
return `Duplicate subscription ${subscriptionId}: Ignoring`
}
const maxSubscriptions = subscriptionLimits?.maxSubscriptions ?? 0
if (maxSubscriptions > 0 && !existingSubscription?.length && subscriptions.size + 1 > maxSubscriptions) {
return `Too many subscriptions: Number of subscriptions must be less than or equal to ${maxSubscriptions}`
}
const maxFilters = subscriptionLimits?.maxFilters ?? 0
if (maxFilters > 0) {
if (filters.length > maxFilters) {
return `Too many filters: Number of filters per susbscription must be less then or equal to ${maxFilters}`
}
}
const maxLimit = subscriptionLimits?.maxLimit ?? 0
if (maxLimit > 0) {
const hasExcessiveLimit = filters.some((filter) => filter.limit !== undefined && filter.limit > maxLimit)
if (hasExcessiveLimit) {
return `Limit too high: Filter limit must be less than or equal to ${maxLimit}`
}
}
if (
typeof subscriptionLimits?.maxSubscriptionIdLength === 'number' &&
subscriptionId.length > subscriptionLimits.maxSubscriptionIdLength
) {
return `Subscription ID too long: Subscription ID must be less or equal to ${subscriptionLimits.maxSubscriptionIdLength}`
}
}
}