@@ -19,6 +19,7 @@ const CLIENTSKEY = 'clients'
1919const WILLSKEY = 'will'
2020const WILLKEY = 'will:'
2121const RETAINEDKEY = 'retained'
22+ const ALL_RETAINEDKEYS = `${ RETAINEDKEY } :*`
2223const OUTGOINGKEY = 'outgoing:'
2324const OUTGOINGIDKEY = 'outgoing-id:'
2425const INCOMINGKEY = 'incoming:'
@@ -52,6 +53,10 @@ function packetKey (brokerId, brokerCounter) {
5253 return `${ PACKETKEY } ${ brokerId } :${ brokerCounter } `
5354}
5455
56+ function retainedKey ( topic ) {
57+ return `${ RETAINEDKEY } :${ encodeURIComponent ( topic ) } `
58+ }
59+
5560function packetCountKey ( brokerId , brokerCounter ) {
5661 return `${ PACKETKEY } ${ brokerId } :${ brokerCounter } :offlineCount`
5762}
@@ -64,25 +69,70 @@ class RedisPersistence extends CachedPersistence {
6469
6570 this . messageIdCache = HLRU ( 100000 )
6671
67- if ( opts . cluster ) {
72+ if ( opts . cluster && Array . isArray ( opts . cluster ) ) {
6873 this . _db = new Redis . Cluster ( opts . cluster )
6974 } else {
7075 this . _db = opts . conn || new Redis ( opts )
7176 }
7277
73- this . _getRetainedChunkBound = this . _getRetainedChunk . bind ( this )
78+ this . hasClusters = ! ! opts . cluster
79+ this . _getRetainedChunkBound = ( this . hasClusters ? this . _getRetainedChunkCluster : this . _getRetainedChunk ) . bind ( this )
80+ this . _getRetainedKeysBound = ( this . hasClusters ? this . _getRetainedKeysCluster : this . _getRetainedKeys ) . bind ( this )
7481 }
7582
76- storeRetained ( packet , cb ) {
83+ /**
84+ * When using clusters we store it using a compound key instead of an hash
85+ * to spread the load across the clusters. See issue #85.
86+ */
87+ _storeRetainedCluster ( packet , cb ) {
88+ if ( packet . payload . length === 0 ) {
89+ this . _db . del ( retainedKey ( packet . topic ) , cb )
90+ } else {
91+ this . _db . set ( retainedKey ( packet . topic ) , msgpack . encode ( packet ) , cb )
92+ }
93+ }
94+
95+ _storeRetained ( packet , cb ) {
7796 if ( packet . payload . length === 0 ) {
7897 this . _db . hdel ( RETAINEDKEY , packet . topic , cb )
7998 } else {
8099 this . _db . hset ( RETAINEDKEY , packet . topic , msgpack . encode ( packet ) , cb )
81100 }
82101 }
83102
84- _getRetainedChunk ( chunk , enc , cb ) {
85- this . _db . hgetBuffer ( RETAINEDKEY , chunk , cb )
103+ storeRetained ( packet , cb ) {
104+ if ( this . hasClusters ) {
105+ this . _storeRetainedCluster ( packet , cb )
106+ } else {
107+ this . _storeRetained ( packet , cb )
108+ }
109+ }
110+
111+ _getRetainedChunkCluster ( topic , enc , cb ) {
112+ this . _db . getBuffer ( retainedKey ( topic ) , cb )
113+ }
114+
115+ _getRetainedChunk ( topic , enc , cb ) {
116+ this . _db . hgetBuffer ( RETAINEDKEY , topic , cb )
117+ }
118+
119+ _getRetainedKeysCluster ( cb ) {
120+ // Get keys of all the masters:
121+ const masters = this . _db . nodes ( 'master' )
122+ Promise . all (
123+ masters
124+ . map ( ( node ) => node . keys ( ALL_RETAINEDKEYS ) )
125+ ) . then ( ( keys ) => {
126+ // keys: [['key1', 'key2'], ['key3', 'key4']]
127+ // flatten the array
128+ cb ( null , keys . reduce ( ( acc , val ) => acc . concat ( val ) , [ ] ) )
129+ } ) . catch ( ( err ) => {
130+ cb ( err )
131+ } )
132+ }
133+
134+ _getRetainedKeys ( cb ) {
135+ this . _db . hkeys ( RETAINEDKEY , cb )
86136 }
87137
88138 createRetainedStreamCombi ( patterns ) {
@@ -95,11 +145,11 @@ class RedisPersistence extends CachedPersistence {
95145
96146 const stream = through . obj ( that . _getRetainedChunkBound )
97147
98- this . _db . hkeys ( RETAINEDKEY , function getKeys ( err , keys ) {
148+ this . _getRetainedKeysBound ( function getKeys ( err , keys ) {
99149 if ( err ) {
100150 stream . emit ( 'error' , err )
101151 } else {
102- matchRetained ( stream , keys , qlobber )
152+ matchRetained ( stream , keys , qlobber , that . hasClusters )
103153 }
104154 } )
105155
@@ -269,7 +319,15 @@ class RedisPersistence extends CachedPersistence {
269319
270320 const encoded = msgpack . encode ( new Packet ( packet ) )
271321
272- this . _db . mset ( pktKey , encoded , countKey , subs . length , finish )
322+ if ( this . hasClusters ) {
323+ // do not do this using `mset`, fails in clusters
324+ outstanding += 1
325+ this . _db . set ( pktKey , encoded , finish )
326+ this . _db . set ( countKey , subs . length , finish )
327+ } else {
328+ this . _db . mset ( pktKey , encoded , countKey , subs . length , finish )
329+ }
330+
273331 if ( ttl > 0 ) {
274332 outstanding += 2
275333 this . _db . expire ( pktKey , ttl , finish )
@@ -319,6 +377,7 @@ class RedisPersistence extends CachedPersistence {
319377 }
320378
321379 let count = 0
380+ let expected = 3
322381 let errored = false
323382
324383 // TODO can be cached in case of wildcard deliveries
@@ -354,7 +413,14 @@ class RedisPersistence extends CachedPersistence {
354413 return cb ( err )
355414 }
356415 if ( remained === 0 ) {
357- that . _db . del ( pktKey , countKey , finish )
416+ if ( that . hasClusters ) {
417+ expected ++
418+ // do not remove multiple keys at once, fails in clusters
419+ that . _db . del ( pktKey , finish )
420+ that . _db . del ( countKey , finish )
421+ } else {
422+ that . _db . del ( pktKey , countKey , finish )
423+ }
358424 } else {
359425 finish ( )
360426 }
@@ -366,7 +432,7 @@ class RedisPersistence extends CachedPersistence {
366432 errored = err
367433 return cb ( err )
368434 }
369- if ( count === 3 && ! errored ) {
435+ if ( count === expected && ! errored ) {
370436 cb ( err , origPacket )
371437 }
372438 }
@@ -525,10 +591,11 @@ class RedisPersistence extends CachedPersistence {
525591 }
526592}
527593
528- function matchRetained ( stream , keys , qlobber ) {
529- for ( const key of keys ) {
530- if ( qlobber . test ( key ) ) {
531- stream . write ( key )
594+ function matchRetained ( stream , topics , qlobber , hasClusters ) {
595+ for ( let t of topics ) {
596+ t = hasClusters ? decodeURIComponent ( t . split ( ':' ) [ 1 ] ) : t
597+ if ( qlobber . test ( t ) ) {
598+ stream . write ( t )
532599 }
533600 }
534601 stream . end ( )
0 commit comments