@@ -13,6 +13,26 @@ const QLOBBER_OPTIONS = {
1313 match_empty_levels : true
1414}
1515
16+ // Batching limits for retained message pattern queries
17+ // MongoDB has a BSON document size limit of 16MB, but regex patterns hit
18+ // practical compilation/execution limits around 32KB. These values are tuned
19+ // to prevent "regular expression is too large" errors while maintaining good
20+ // query performance.
21+ //
22+ // Why two limits?
23+ // - MAX_TOTAL_PATTERN_LENGTH: Prevents MongoDB regex size errors (~32KB limit)
24+ // Cumulative pattern length is tracked to stay well under MongoDB's practical
25+ // regex limit, providing a safety margin for regex escaping and construction.
26+ //
27+ // - MAX_PATTERNS_PER_BATCH: Optimizes query performance by reducing network overhead
28+ // Larger batches mean fewer MongoDB queries, which typically improves performance
29+ // more than smaller batches with simpler regex patterns.
30+ //
31+ // These values balance safety (preventing regex errors) with performance (minimizing
32+ // the number of database queries needed to process subscription patterns).
33+ const MAX_PATTERNS_PER_BATCH = 200
34+ const MAX_TOTAL_PATTERN_LENGTH = 15000
35+
1636class AsyncMongoPersistence {
1737 // private class members start with #
1838 #trie
@@ -48,6 +68,29 @@ class AsyncMongoPersistence {
4868 this . #executing = false // used as lock while a bulk is executing
4969 }
5070
71+ #addTTLIndexes ( indexes ) {
72+ const addTTLIndex = ( collection , key , expireAfterSeconds ) => {
73+ if ( expireAfterSeconds >= 0 ) {
74+ indexes . push ( { collection, key, name : 'ttl' , expireAfterSeconds } )
75+ }
76+ }
77+
78+ if ( this . #opts. ttl . subscriptions >= 0 ) {
79+ addTTLIndex (
80+ 'subscriptions' ,
81+ this . #opts. ttlAfterDisconnected ? 'disconnected' : 'added' ,
82+ this . #opts. ttl . subscriptions
83+ )
84+ }
85+
86+ if ( this . #opts. ttl . packets ) {
87+ addTTLIndex ( 'retained' , 'added' , this . #opts. ttl . packets . retained )
88+ addTTLIndex ( 'will' , 'packet.added' , this . #opts. ttl . packets . will )
89+ addTTLIndex ( 'outgoing' , 'packet.added' , this . #opts. ttl . packets . outgoing )
90+ addTTLIndex ( 'incoming' , 'packet.added' , this . #opts. ttl . packets . incoming )
91+ }
92+ }
93+
5194 // access #broker, only for testing
5295 get broker ( ) {
5396 return this . #broker
@@ -148,52 +191,8 @@ class AsyncMongoPersistence {
148191 }
149192 ]
150193
151- if ( this . #opts. ttl . subscriptions >= 0 ) {
152- indexes . push ( {
153- collection : 'subscriptions' ,
154- key : this . #opts. ttlAfterDisconnected ? 'disconnected' : 'added' ,
155- name : 'ttl' ,
156- expireAfterSeconds : this . #opts. ttl . subscriptions
157- } )
158- }
159-
160- if ( this . #opts. ttl . packets ) {
161- if ( this . #opts. ttl . packets . retained >= 0 ) {
162- indexes . push ( {
163- collection : 'retained' ,
164- key : 'added' ,
165- name : 'ttl' ,
166- expireAfterSeconds : this . #opts. ttl . packets . retained
167- } )
168- }
169-
170- if ( this . #opts. ttl . packets . will >= 0 ) {
171- indexes . push ( {
172- collection : 'will' ,
173- key : 'packet.added' ,
174- name : 'ttl' ,
175- expireAfterSeconds : this . #opts. ttl . packets . will
176- } )
177- }
178-
179- if ( this . #opts. ttl . packets . outgoing >= 0 ) {
180- indexes . push ( {
181- collection : 'outgoing' ,
182- key : 'packet.added' ,
183- name : 'ttl' ,
184- expireAfterSeconds : this . #opts. ttl . packets . outgoing
185- } )
186- }
187-
188- if ( this . #opts. ttl . packets . incoming >= 0 ) {
189- indexes . push ( {
190- collection : 'incoming' ,
191- key : 'packet.added' ,
192- name : 'ttl' ,
193- expireAfterSeconds : this . #opts. ttl . packets . incoming
194- } )
195- }
196- }
194+ // Add TTL indexes
195+ this . #addTTLIndexes( indexes )
197196 // create all indexes in parallel
198197 await Promise . all ( indexes . map ( createIndex ) )
199198
@@ -235,7 +234,7 @@ class AsyncMongoPersistence {
235234 onEnd . push ( resolve )
236235 }
237236 // execute operations and ignore the error
238- await this . #cl. retained . bulkWrite ( operations ) . catch ( ( ) => { } )
237+ await this . #cl. retained . bulkWrite ( operations ) . catch ( ( ) => { } )
239238 // resolve all promises
240239 while ( onEnd . length ) onEnd . shift ( ) . call ( )
241240 // check if we have new packets in queue
@@ -256,14 +255,13 @@ class AsyncMongoPersistence {
256255 const { promise, resolve } = promiseWithResolvers ( )
257256 const queue = this . #retainedBulkQueue
258257 const filter = { topic : packet . topic }
259- const setTTL = this . #opts. ttl . packets
260258
261259 if ( packet . payload . length > 0 ) {
262260 queue . push ( {
263261 operation : {
264262 updateOne : {
265263 filter,
266- update : { $set : decoratePacket ( packet , setTTL ) } ,
264+ update : { $set : decoratePacket ( packet , this . #opts . ttl . packets ) } ,
267265 upsert : true
268266 }
269267 } ,
@@ -288,17 +286,84 @@ class AsyncMongoPersistence {
288286 }
289287
290288 async * createRetainedStreamCombi ( patterns ) {
291- const regexes = [ ]
292289 const matcher = new Qlobber ( QLOBBER_OPTIONS )
293290
294291 for ( let i = 0 ; i < patterns . length ; i ++ ) {
295292 matcher . add ( patterns [ i ] , true )
296- regexes . push ( regEscape ( patterns [ i ] ) . replace ( / ( \/ * # | \\ \+ ) .* $ / , '' ) )
297293 }
298294
295+ // Calculate total pattern length
296+ const totalLength = patterns . reduce ( ( sum , p ) => sum + p . length , 0 )
297+
298+ // Determine if we need to batch
299+ const needsBatching =
300+ patterns . length > MAX_PATTERNS_PER_BATCH ||
301+ totalLength > MAX_TOTAL_PATTERN_LENGTH
302+
303+ if ( needsBatching ) {
304+ // Process patterns in batches to avoid creating regex that's too large
305+ // Use dynamic batching based on cumulative length
306+ const seenTopics = new Set ( ) // Track yielded packets to avoid duplicates
307+ const batches = [ ]
308+ let currentBatch = [ ]
309+ let currentLength = 0
310+
311+ for ( const pattern of patterns ) {
312+ const patternLength = pattern . length
313+
314+ // Edge case: if a single pattern exceeds MAX_TOTAL_PATTERN_LENGTH,
315+ // it will be placed in its own batch. This is intentional behavior
316+ // to ensure the pattern is still processed (MongoDB will handle it
317+ // or fail with a clear error). Very long patterns (>32KB after escaping)
318+ // may still cause MongoDB "regular expression is too large" errors.
319+
320+ // Start a new batch if adding this pattern would exceed limits
321+ if ( currentBatch . length >= MAX_PATTERNS_PER_BATCH ||
322+ ( currentLength + patternLength > MAX_TOTAL_PATTERN_LENGTH && currentBatch . length > 0 ) ) {
323+ batches . push ( currentBatch )
324+ currentBatch = [ ]
325+ currentLength = 0
326+ }
327+ currentBatch . push ( pattern )
328+ currentLength += patternLength
329+ }
330+ // Add the last batch if not empty
331+ if ( currentBatch . length > 0 ) {
332+ batches . push ( currentBatch )
333+ }
334+
335+ for ( const batch of batches ) {
336+ for await ( const packet of this . #queryRetainedByPatterns( batch , matcher ) ) {
337+ // Avoid duplicates across batches
338+ if ( ! seenTopics . has ( packet . topic ) ) {
339+ seenTopics . add ( packet . topic )
340+ yield packet
341+ }
342+ }
343+ }
344+ } else {
345+ // Original logic for small pattern sets
346+ for await ( const packet of this . #queryRetainedByPatterns( patterns , matcher ) ) {
347+ yield packet
348+ }
349+ }
350+ }
351+
352+ async * #queryRetainedByPatterns ( patterns , matcher ) {
353+ // Early return for empty patterns to avoid creating an empty regex
354+ // that would match all documents in the collection
355+ if ( patterns . length === 0 ) {
356+ return
357+ }
358+
359+ const regexes = patterns . map ( pattern =>
360+ regEscape ( pattern ) . replace ( / ( \/ * # | \\ \+ ) .* $ / , '' )
361+ )
362+
299363 const topic = new RegExp ( regexes . join ( '|' ) )
300364 const filter = { topic }
301- const exclude = { _id : 0 } // exclude the _id field
365+ const exclude = { _id : 0 }
366+
302367 for await ( const result of this . #cl. retained . find ( filter ) . project ( exclude ) ) {
303368 const packet = asPacket ( result )
304369 if ( matcher . match ( packet . topic ) . length > 0 ) {
@@ -308,13 +373,11 @@ class AsyncMongoPersistence {
308373 }
309374
310375 async addSubscriptions ( client , subs ) {
311- const operations = [ ]
312376 const subscriptions = [ ]
313- for ( const sub of subs ) {
314- const subscription = Object . assign ( { } , sub )
315- subscription . clientId = client . id
377+ const operations = subs . map ( sub => {
378+ const subscription = { ...sub , clientId : client . id }
316379 subscriptions . push ( subscription )
317- operations . push ( {
380+ return {
318381 updateOne : {
319382 filter : {
320383 clientId : client . id ,
@@ -325,26 +388,23 @@ class AsyncMongoPersistence {
325388 } ,
326389 upsert : true
327390 }
328- } )
329- }
391+ }
392+ } )
330393
331394 await this . #cl. subscriptions . bulkWrite ( operations )
332395 // inform the broker
333396 await this . #broadcast. addedSubscriptions ( client , subs )
334397 }
335398
336399 async removeSubscriptions ( client , subs ) {
337- const operations = [ ]
338- for ( const topic of subs ) {
339- operations . push ( {
340- deleteOne : {
341- filter : {
342- clientId : client . id ,
343- topic
344- }
400+ const operations = subs . map ( topic => ( {
401+ deleteOne : {
402+ filter : {
403+ clientId : client . id ,
404+ topic
345405 }
346- } )
347- }
406+ }
407+ } ) )
348408 await this . #cl. subscriptions . bulkWrite ( operations )
349409 // inform the broker
350410 await this . #broadcast. removedSubscriptions ( client , subs )
@@ -406,16 +466,12 @@ class AsyncMongoPersistence {
406466 return packet
407467 }
408468
409- const packets = [ ]
410469 const newPacket = new Packet ( packet )
411- const setTTL = this . #opts. ttl . packets
412-
413- for ( const sub of subs ) {
414- packets . push ( {
415- clientId : sub . clientId ,
416- packet : decoratePacket ( newPacket , setTTL )
417- } )
418- }
470+ const decoratedPacket = decoratePacket ( newPacket , this . #opts. ttl . packets )
471+ const packets = subs . map ( sub => ( {
472+ clientId : sub . clientId ,
473+ packet : decoratedPacket
474+ } ) )
419475
420476 await this . #cl. outgoing . insertMany ( packets )
421477 }
@@ -435,26 +491,20 @@ class AsyncMongoPersistence {
435491 }
436492
437493 async outgoingClearMessageId ( client , packet ) {
438- const outgoing = this . #cl. outgoing
439-
440- const result = await outgoing . findOneAndDelete ( {
494+ const result = await this . #cl. outgoing . findOneAndDelete ( {
441495 clientId : client . id ,
442496 'packet.messageId' : packet . messageId
443497 } )
444- if ( ! result ) {
445- return null // packet not found
446- }
447- return asPacket ( result )
498+ return result ? asPacket ( result ) : null
448499 }
449500
450501 async incomingStorePacket ( client , packet ) {
451502 const newPacket = new Packet ( packet )
452503 newPacket . messageId = packet . messageId
453- const setTTL = this . #opts. ttl . packets
454504
455505 await this . #cl. incoming . insertOne ( {
456506 clientId : client . id ,
457- packet : decoratePacket ( newPacket , setTTL )
507+ packet : decoratePacket ( newPacket , this . #opts . ttl . packets )
458508 } )
459509 }
460510
@@ -479,12 +529,11 @@ class AsyncMongoPersistence {
479529 }
480530
481531 async putWill ( client , packet ) {
482- const setTTL = this . #opts. ttl . packets
483532 packet . clientId = client . id
484533 packet . brokerId = this . #broker. id
485534 await this . #cl. will . insertOne ( {
486535 clientId : client . id ,
487- packet : decoratePacket ( packet , setTTL )
536+ packet : decoratePacket ( packet , this . #opts . ttl . packets )
488537 } )
489538 }
490539
0 commit comments