diff --git a/asyncPersistence.js b/asyncPersistence.js index 1e9628a..a88f8fc 100644 --- a/asyncPersistence.js +++ b/asyncPersistence.js @@ -15,7 +15,7 @@ const CLIENTSKEY = 'clients' const WILLSKEY = 'will' const WILLKEY = 'will:' const RETAINEDKEY = 'retained' -const ALL_RETAINEDKEYS = `${RETAINEDKEY}:*` +// const ALL_RETAINEDKEYS = `${RETAINEDKEY}:*` const OUTGOINGKEY = 'outgoing:' const OUTGOINGIDKEY = 'outgoing-id:' const INCOMINGKEY = 'incoming:' @@ -66,23 +66,40 @@ async function getDecodedValue (db, listKey, key) { return decoded } -async function getRetainedKeys (db, hasClusters) { - if (hasClusters === true) { - // Get keys of all the masters - const masters = db.nodes('master') - const keys = await Promise.all( - masters.map((node) => node.keys(ALL_RETAINEDKEYS)) - ) - return keys.flat() - } - return await db.hkeys(RETAINEDKEY) +function keyToTopic (key) { + return decodeURIComponent(key.split(':')[1]) +} + +function longestCommonPrefix (patterns) { + if (!patterns || patterns.length === 0) { + return '' + } + + patterns.sort() // Sort the array lexicographically + const first = patterns[0] + const last = patterns.at(-1) + let i = 0 + + while (i < first.length && i < last.length && first[i] === last[i]) { + i++ + } + return first.substring(0, i) +} + +function wildCardPosition (pattern) { + const oneIndex = pattern.indexOf(qlobberOpts.wildcard_one) + const someIndex = pattern.indexOf(qlobberOpts.wildcard_some) + if (oneIndex > someIndex) { + return oneIndex + } + return someIndex } async function getRetainedValue (db, topic, hasClusters) { if (hasClusters === true) { - return msgpack.decode(await db.getBuffer(retainedKey(topic))) + return await db.getBuffer(retainedKey(topic)) } - return msgpack.decode(await db.hgetBuffer(RETAINEDKEY, topic)) + return await db.hgetBuffer(RETAINEDKEY, topic) } async function * createWillStream (db, brokers, maxWills) { @@ -100,12 +117,75 @@ function * getClientIdFromEntries (entries) { } } -async function * matchRetained (db, qlobber, hasClusters) { - const keys = await getRetainedKeys(db, hasClusters) - for (const key of keys) { - const topic = hasClusters ? decodeURIComponent(key.split(':')[1]) : key - if (qlobber.test(topic)) { - yield getRetainedValue(db, topic, hasClusters) +// ioredis does not provide a scanBufferStream on a cluster so we need to do this ourselves +// scanBuffer only returns keys! +async function * clusterMatchRetained (cluster, prefix, qlobber, sentTopics) { + const opts = { match: retainedKey(prefix) } + const masterNodes = cluster.nodes('master') + for (const node of masterNodes) { + for await (const key of node.scanBufferStream(opts)) { + if (key.length === 0) { + // nothing found + continue + } + const topic = keyToTopic(key.toString()) + if (!sentTopics.has(topic) && qlobber.test(topic)) { + const value = await cluster.getBuffer(retainedKey(topic)) + if (value) { + const packet = msgpack.decode(value) + yield packet + } + } + } + } +} + +// hscanBuffer always returns keys + values! +async function * singleMatchRetained (db, prefix, qlobber, sentTopics) { + const stream = db.hscanBufferStream(RETAINEDKEY, { match: prefix }) + for await (const data of stream) { + for (let i = 0; i < data.length; i += 2) { + const key = data[i] + const value = data[i + 1] + const topic = key.toString() + if (qlobber.test(topic)) { + const packet = msgpack.decode(value) + if (packet && !sentTopics.has(packet.topic)) { + yield packet + } + } + } + } +} + +async function * matchRetained (db, patterns, hasClusters) { + const wildcards = [] + const qlobber = new QlobberTrue(qlobberOpts) + const sentTopics = new Set() + + for (const p of patterns) { + const wPos = wildCardPosition(p) + if (wPos === -1) { + // no wildcards + const packet = await getRetainedValue(db, p, hasClusters) + if (packet) { + // track for which topics a packet has been sent to avoid sending packets + // twice because of combo of no-wildcard and wildcard + sentTopics.add(packet.topic) + yield msgpack.decode(packet) + } + } else { + // wildcard present + qlobber.add(p) + wildcards.push(p.substring(0, wPos - 1)) + } + } + + if (wildcards.length) { + const prefix = longestCommonPrefix(wildcards) + '*' + const stream = hasClusters ? clusterMatchRetained(db, prefix, qlobber, sentTopics) : singleMatchRetained(db, prefix, qlobber, sentTopics) + for await (const packet of stream) { + yield packet } } } @@ -254,13 +334,7 @@ class AsyncRedisPersistence { } createRetainedStreamCombi (patterns) { - const qlobber = new QlobberTrue(qlobberOpts) - - for (const pattern of patterns) { - qlobber.add(pattern) - } - - return matchRetained(this._db, qlobber, this.hasClusters) + return matchRetained(this._db, patterns, this.hasClusters) } createRetainedStream (pattern) {