Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions asyncPersistence.js
Original file line number Diff line number Diff line change
Expand Up @@ -299,8 +299,8 @@ class AsyncRedisPersistence {
async countOffline () {
const count = await this._db.scard(CLIENTSKEY)
const clientsCount = Number.parseInt(count) || 0
const subscriptionsCount = this._trie.subscriptionsCount
return { subscriptionsCount, clientsCount }
const subsCount = this._trie.subscriptionsCount
return { subsCount, clientsCount }
}

async subscriptionsByTopic (topic) {
Expand Down Expand Up @@ -482,6 +482,4 @@ class AsyncRedisPersistence {
}
}

module.exports = {
AsyncRedisPersistence
}
module.exports = AsyncRedisPersistence
8 changes: 4 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,16 @@
"devDependencies": {
"@fastify/pre-commit": "^2.2.0",
"c8": "^10.1.3",
"eslint": "^9.25.1",
"eslint": "^9.26.0",
"fastq": "^1.19.1",
"license-checker": "^25.0.1",
"mqemitter-redis": "^7.0.0",
"mqtt": "^5.11.1",
"mqtt": "^5.13.0",
"neostandard": "^0.12.1",
"release-it": "^19.0.1"
"release-it": "^19.0.2"
},
"dependencies": {
"aedes-cached-persistence": "^10.0.0",
"aedes-cached-persistence": "^10.1.1",
"hashlru": "^2.3.0",
"ioredis": "^5.6.1",
"msgpack-lite": "^0.1.26",
Expand Down
235 changes: 4 additions & 231 deletions persistence.js
Original file line number Diff line number Diff line change
@@ -1,233 +1,6 @@
'use strict'

const { Readable } = require('node:stream')
const CachedPersistence = require('aedes-cached-persistence')
const { AsyncRedisPersistence } = require('./asyncPersistence')

class RedisPersistence extends CachedPersistence {
constructor (opts = {}) {
super(opts)
this.asyncPersistence = new AsyncRedisPersistence(opts)
}

_setup () {
if (this.ready) {
return
}
this.asyncPersistence.broker = this.broker
this.asyncPersistence._trie = this._trie
this.asyncPersistence.setup()
.then(() => {
this.emit('ready')
})
.catch(err => {
this.emit('error', err)
})
}

storeRetained (packet, cb) {
if (!this.ready) {
this.once('ready', this.storeRetained.bind(this, packet, cb))
return
}
this.asyncPersistence.storeRetained(packet).then(() => {
cb(null)
}).catch(cb)
}

createRetainedStream (pattern) {
return Readable.from(this.asyncPersistence.createRetainedStream(pattern))
}

createRetainedStreamCombi (patterns) {
return Readable.from(this.asyncPersistence.createRetainedStreamCombi(patterns))
}

addSubscriptions (client, subs, cb) {
if (!this.ready) {
this.once('ready', this.addSubscriptions.bind(this, client, subs, cb))
return
}

const addSubs1 = this.asyncPersistence.addSubscriptions(client, subs)
// promisify
const addSubs2 = new Promise((resolve, reject) => {
this._addedSubscriptions(client, subs, (err) => {
if (err) {
reject(err)
} else {
resolve()
}
})
})
Promise.all([addSubs1, addSubs2])
.then(() => cb(null, client))
.catch(err => cb(err, client))
}

removeSubscriptions (client, subs, cb) {
if (!this.ready) {
this.once('ready', this.removeSubscriptions.bind(this, client, subs, cb))
return
}

const remSubs1 = this.asyncPersistence.removeSubscriptions(client, subs)
// promisify
const mappedSubs = subs.map(sub => { return { topic: sub } })
const remSubs2 = new Promise((resolve, reject) => {
this._removedSubscriptions(client, mappedSubs, (err) => {
if (err) {
reject(err)
} else {
resolve()
}
})
})
Promise.all([remSubs1, remSubs2])
.then(() => process.nextTick(cb, null, client))
.catch(err => cb(err, client))
}

subscriptionsByClient (client, cb) {
if (!this.ready) {
this.once('ready', this.subscriptionsByClient.bind(this, client, cb))
return
}

this.asyncPersistence.subscriptionsByClient(client)
.then(results => process.nextTick(cb, null, results.length > 0 ? results : null, client))
.catch(cb)
}

countOffline (cb) {
this.asyncPersistence.countOffline()
.then(res => process.nextTick(cb, null, res.subscriptionsCount, res.clientsCount))
.catch(cb)
}

destroy (cb) {
if (!this.ready) {
this.once('ready', this.destroy.bind(this, cb))
return
}

if (this._destroyed) {
throw new Error('destroyed called twice!')
}

this._destroyed = true

const finish = cb || noop

this.asyncPersistence.destroy()
.finally(finish) // swallow err in case of failure
}

outgoingEnqueue (sub, packet, cb) {
if (!this.ready) {
this.once('ready', this.outgoingEnqueue.bind(this, sub, packet, cb))
return
}
this.asyncPersistence.outgoingEnqueue(sub, packet)
.then(() => process.nextTick(cb, null, packet))
.catch(cb)
}

outgoingEnqueueCombi (subs, packet, cb) {
if (!this.ready) {
this.once('ready', this.outgoingEnqueueCombi.bind(this, subs, packet, cb))
return
}
this.asyncPersistence.outgoingEnqueueCombi(subs, packet)
.then(() => process.nextTick(cb, null, packet))
.catch(cb)
}

outgoingStream (client) {
return Readable.from(this.asyncPersistence.outgoingStream(client))
}

outgoingUpdate (client, packet, cb) {
if (!this.ready) {
this.once('ready', this.outgoingUpdate.bind(this, client, packet, cb))
return
}
this.asyncPersistence.outgoingUpdate(client, packet)
.then(() => cb(null, client, packet))
.catch(cb)
}

outgoingClearMessageId (client, packet, cb) {
if (!this.ready) {
this.once('ready', this.outgoingClearMessageId.bind(this, client, packet, cb))
return
}
this.asyncPersistence.outgoingClearMessageId(client, packet)
.then((packet) => cb(null, packet))
.catch(cb)
}

incomingStorePacket (client, packet, cb) {
if (!this.ready) {
this.once('ready', this.incomingStorePacket.bind(this, client, packet, cb))
return
}
this.asyncPersistence.incomingStorePacket(client, packet)
.then(() => cb(null))
.catch(cb)
}

incomingGetPacket (client, packet, cb) {
if (!this.ready) {
this.once('ready', this.incomingGetPacket.bind(this, client, packet, cb))
return
}
this.asyncPersistence.incomingGetPacket(client, packet)
.then((packet) => cb(null, packet, client))
.catch(cb)
}

incomingDelPacket (client, packet, cb) {
if (!this.ready) {
this.once('ready', this.incomingDelPacket.bind(this, client, packet, cb))
return
}
this.asyncPersistence.incomingDelPacket(client, packet)
.then(() => cb(null))
.catch(cb)
}

putWill (client, packet, cb) {
if (!this.ready) {
this.once('ready', this.putWill.bind(this, client, packet, cb))
return
}
this.asyncPersistence.putWill(client, packet)
.then(() => cb(null, client))
.catch(cb)
}

getWill (client, cb) {
this.asyncPersistence.getWill(client)
.then((packet) => cb(null, packet, client))
.catch(cb)
}

delWill (client, cb) {
this.asyncPersistence.delWill(client)
.then((packet) => cb(null, packet, client))
.catch(cb)
}

streamWill (brokers) {
return Readable.from(this.asyncPersistence.streamWill(brokers))
}

getClientList (topic) {
return Readable.from(this.asyncPersistence.getClientList(topic))
}
}

function noop () {}

module.exports = (opts) => new RedisPersistence(opts)
const { CallBackPersistence } = require('aedes-cached-persistence/callBackPersistence.js')
const AsyncPersistence = require('./asyncPersistence.js')
const asyncInstanceFactory = (opts) => new AsyncPersistence(opts)
module.exports = (opts) => new CallBackPersistence(asyncInstanceFactory, opts)
14 changes: 3 additions & 11 deletions test/abs.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,12 @@ const persistence = require('../persistence.js')
const Redis = require('ioredis')
const mqemitterRedis = require('mqemitter-redis')
const abs = require('aedes-cached-persistence/abstract')
const { once } = require('node:events')

function sleep (sec) {
return new Promise(resolve => setTimeout(resolve, sec * 1000))
}

function waitForEvent (obj, resolveEvt) {
return new Promise((resolve, reject) => {
obj.once(resolveEvt, () => {
resolve()
})
obj.once('error', reject)
})
}

function setUpPersistence (t, id, persistenceOpts) {
const emitter = mqemitterRedis()
const instance = persistence(persistenceOpts)
Expand Down Expand Up @@ -45,7 +37,7 @@ function unref () {

async function createDB () {
const db = new Redis()
await waitForEvent(db, 'connect')
await once(db, 'connect')
await db.flushall()
return db
}
Expand Down Expand Up @@ -86,7 +78,7 @@ async function doTest () {
const p = setUpPersistence(t, '1', {
conn: externalRedis
})
await waitForEvent(p.instance, 'ready')
await once(p.instance, 'ready')
t.assert.ok(true, 'instance ready')
t.diagnostic('instance ready')
externalRedis.disconnect()
Expand Down
16 changes: 4 additions & 12 deletions test/cluster.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ const persistence = require('../persistence.js')
const Redis = require('ioredis')
const mqemitterRedis = require('mqemitter-redis')
const abs = require('aedes-cached-persistence/abstract')
const { once } = require('node:events')

const nodes = [
{ host: 'localhost', port: 6378 },
Expand All @@ -17,15 +18,6 @@ function sleep (sec) {
return new Promise(resolve => setTimeout(resolve, sec * 1000))
}

function waitForEvent (obj, resolveEvt) {
return new Promise((resolve, reject) => {
obj.once(resolveEvt, () => {
resolve()
})
obj.once('error', reject)
})
}

function setUpPersistence (t, id, persistenceOpts) {
const emitter = mqemitterRedis()
const instance = persistence(persistenceOpts)
Expand Down Expand Up @@ -54,7 +46,7 @@ function unref () {

async function createDB () {
const db = new Redis.Cluster(nodes)
await waitForEvent(db, 'connect')
await once(db, 'connect')
const dbNodes = db.nodes('master')
await Promise.all(dbNodes.map(node => { return node.flushdb() }))
return db
Expand Down Expand Up @@ -97,7 +89,7 @@ async function doTest () {
const p = setUpPersistence(t, '1', {
conn: externalRedis
})
await waitForEvent(p.instance, 'ready')
await once(p.instance, 'ready')
t.assert.ok(true, 'instance ready')
t.diagnostic('instance ready')
externalRedis.disconnect()
Expand All @@ -112,6 +104,6 @@ async function doTest () {
waitForReady: true
})
// make sure everything cleans up nicely
await sleep(2)
await sleep(4)
}
doTest()
Loading