Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 100 additions & 26 deletions asyncPersistence.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ const CLIENTSKEY = 'clients'
const WILLSKEY = 'will'
const WILLKEY = 'will:'
const RETAINEDKEY = 'retained'
const ALL_RETAINEDKEYS = `${RETAINEDKEY}:*`
// const ALL_RETAINEDKEYS = `${RETAINEDKEY}:*`
const OUTGOINGKEY = 'outgoing:'
const OUTGOINGIDKEY = 'outgoing-id:'
const INCOMINGKEY = 'incoming:'
Expand Down Expand Up @@ -66,23 +66,40 @@ async function getDecodedValue (db, listKey, key) {
return decoded
}

async function getRetainedKeys (db, hasClusters) {
if (hasClusters === true) {
// Get keys of all the masters
const masters = db.nodes('master')
const keys = await Promise.all(
masters.map((node) => node.keys(ALL_RETAINEDKEYS))
)
return keys.flat()
}
return await db.hkeys(RETAINEDKEY)
function keyToTopic (key) {
return decodeURIComponent(key.split(':')[1])
}

function longestCommonPrefix (patterns) {
if (!patterns || patterns.length === 0) {
return ''
}

patterns.sort() // Sort the array lexicographically
const first = patterns[0]
const last = patterns.at(-1)
let i = 0

while (i < first.length && i < last.length && first[i] === last[i]) {
i++
}
return first.substring(0, i)
}

function wildCardPosition (pattern) {
const oneIndex = pattern.indexOf(qlobberOpts.wildcard_one)
const someIndex = pattern.indexOf(qlobberOpts.wildcard_some)
if (oneIndex > someIndex) {
return oneIndex
}
return someIndex
}

async function getRetainedValue (db, topic, hasClusters) {
if (hasClusters === true) {
return msgpack.decode(await db.getBuffer(retainedKey(topic)))
return await db.getBuffer(retainedKey(topic))
}
return msgpack.decode(await db.hgetBuffer(RETAINEDKEY, topic))
return await db.hgetBuffer(RETAINEDKEY, topic)
}

async function * createWillStream (db, brokers, maxWills) {
Expand All @@ -100,12 +117,75 @@ function * getClientIdFromEntries (entries) {
}
}

async function * matchRetained (db, qlobber, hasClusters) {
const keys = await getRetainedKeys(db, hasClusters)
for (const key of keys) {
const topic = hasClusters ? decodeURIComponent(key.split(':')[1]) : key
if (qlobber.test(topic)) {
yield getRetainedValue(db, topic, hasClusters)
// ioredis does not provide a scanBufferStream on a cluster so we need to do this ourselves
// scanBuffer only returns keys!
async function * clusterMatchRetained (cluster, prefix, qlobber, sentTopics) {
const opts = { match: retainedKey(prefix) }
const masterNodes = cluster.nodes('master')
for (const node of masterNodes) {
for await (const key of node.scanBufferStream(opts)) {
if (key.length === 0) {
// nothing found
continue
}
const topic = keyToTopic(key.toString())
if (!sentTopics.has(topic) && qlobber.test(topic)) {
const value = await cluster.getBuffer(retainedKey(topic))
if (value) {
const packet = msgpack.decode(value)
yield packet
}
}
}
}
}

// hscanBuffer always returns keys + values!
async function * singleMatchRetained (db, prefix, qlobber, sentTopics) {
const stream = db.hscanBufferStream(RETAINEDKEY, { match: prefix })
for await (const data of stream) {
for (let i = 0; i < data.length; i += 2) {
const key = data[i]
const value = data[i + 1]
const topic = key.toString()
if (qlobber.test(topic)) {
const packet = msgpack.decode(value)
if (packet && !sentTopics.has(packet.topic)) {
yield packet
}
}
}
}
}

async function * matchRetained (db, patterns, hasClusters) {
const wildcards = []
const qlobber = new QlobberTrue(qlobberOpts)
const sentTopics = new Set()

for (const p of patterns) {
const wPos = wildCardPosition(p)
if (wPos === -1) {
// no wildcards
const packet = await getRetainedValue(db, p, hasClusters)
if (packet) {
// track for which topics a packet has been sent to avoid sending packets
// twice because of combo of no-wildcard and wildcard
sentTopics.add(packet.topic)
yield msgpack.decode(packet)
}
} else {
// wildcard present
qlobber.add(p)
wildcards.push(p.substring(0, wPos - 1))
}
}

if (wildcards.length) {
const prefix = longestCommonPrefix(wildcards) + '*'
const stream = hasClusters ? clusterMatchRetained(db, prefix, qlobber, sentTopics) : singleMatchRetained(db, prefix, qlobber, sentTopics)
for await (const packet of stream) {
yield packet
}
}
}
Expand Down Expand Up @@ -254,13 +334,7 @@ class AsyncRedisPersistence {
}

createRetainedStreamCombi (patterns) {
const qlobber = new QlobberTrue(qlobberOpts)

for (const pattern of patterns) {
qlobber.add(pattern)
}

return matchRetained(this._db, qlobber, this.hasClusters)
return matchRetained(this._db, patterns, this.hasClusters)
}

createRetainedStream (pattern) {
Expand Down