diff --git a/asyncPersistence.js b/asyncPersistence.js index 2e4f747..1e9628a 100644 --- a/asyncPersistence.js +++ b/asyncPersistence.js @@ -299,8 +299,8 @@ class AsyncRedisPersistence { async countOffline () { const count = await this._db.scard(CLIENTSKEY) const clientsCount = Number.parseInt(count) || 0 - const subscriptionsCount = this._trie.subscriptionsCount - return { subscriptionsCount, clientsCount } + const subsCount = this._trie.subscriptionsCount + return { subsCount, clientsCount } } async subscriptionsByTopic (topic) { @@ -482,6 +482,4 @@ class AsyncRedisPersistence { } } -module.exports = { - AsyncRedisPersistence -} +module.exports = AsyncRedisPersistence diff --git a/package.json b/package.json index 29a080f..2d905b5 100644 --- a/package.json +++ b/package.json @@ -54,16 +54,16 @@ "devDependencies": { "@fastify/pre-commit": "^2.2.0", "c8": "^10.1.3", - "eslint": "^9.25.1", + "eslint": "^9.26.0", "fastq": "^1.19.1", "license-checker": "^25.0.1", "mqemitter-redis": "^7.0.0", - "mqtt": "^5.11.1", + "mqtt": "^5.13.0", "neostandard": "^0.12.1", - "release-it": "^19.0.1" + "release-it": "^19.0.2" }, "dependencies": { - "aedes-cached-persistence": "^10.0.0", + "aedes-cached-persistence": "^10.1.1", "hashlru": "^2.3.0", "ioredis": "^5.6.1", "msgpack-lite": "^0.1.26", diff --git a/persistence.js b/persistence.js index 61b2cb3..b99cd44 100644 --- a/persistence.js +++ b/persistence.js @@ -1,233 +1,6 @@ 'use strict' -const { Readable } = require('node:stream') -const CachedPersistence = require('aedes-cached-persistence') -const { AsyncRedisPersistence } = require('./asyncPersistence') - -class RedisPersistence extends CachedPersistence { - constructor (opts = {}) { - super(opts) - this.asyncPersistence = new AsyncRedisPersistence(opts) - } - - _setup () { - if (this.ready) { - return - } - this.asyncPersistence.broker = this.broker - this.asyncPersistence._trie = this._trie - this.asyncPersistence.setup() - .then(() => { - this.emit('ready') - }) - .catch(err => { - this.emit('error', err) - }) - } - - storeRetained (packet, cb) { - if (!this.ready) { - this.once('ready', this.storeRetained.bind(this, packet, cb)) - return - } - this.asyncPersistence.storeRetained(packet).then(() => { - cb(null) - }).catch(cb) - } - - createRetainedStream (pattern) { - return Readable.from(this.asyncPersistence.createRetainedStream(pattern)) - } - - createRetainedStreamCombi (patterns) { - return Readable.from(this.asyncPersistence.createRetainedStreamCombi(patterns)) - } - - addSubscriptions (client, subs, cb) { - if (!this.ready) { - this.once('ready', this.addSubscriptions.bind(this, client, subs, cb)) - return - } - - const addSubs1 = this.asyncPersistence.addSubscriptions(client, subs) - // promisify - const addSubs2 = new Promise((resolve, reject) => { - this._addedSubscriptions(client, subs, (err) => { - if (err) { - reject(err) - } else { - resolve() - } - }) - }) - Promise.all([addSubs1, addSubs2]) - .then(() => cb(null, client)) - .catch(err => cb(err, client)) - } - - removeSubscriptions (client, subs, cb) { - if (!this.ready) { - this.once('ready', this.removeSubscriptions.bind(this, client, subs, cb)) - return - } - - const remSubs1 = this.asyncPersistence.removeSubscriptions(client, subs) - // promisify - const mappedSubs = subs.map(sub => { return { topic: sub } }) - const remSubs2 = new Promise((resolve, reject) => { - this._removedSubscriptions(client, mappedSubs, (err) => { - if (err) { - reject(err) - } else { - resolve() - } - }) - }) - Promise.all([remSubs1, remSubs2]) - .then(() => process.nextTick(cb, null, client)) - .catch(err => cb(err, client)) - } - - subscriptionsByClient (client, cb) { - if (!this.ready) { - this.once('ready', this.subscriptionsByClient.bind(this, client, cb)) - return - } - - this.asyncPersistence.subscriptionsByClient(client) - .then(results => process.nextTick(cb, null, results.length > 0 ? results : null, client)) - .catch(cb) - } - - countOffline (cb) { - this.asyncPersistence.countOffline() - .then(res => process.nextTick(cb, null, res.subscriptionsCount, res.clientsCount)) - .catch(cb) - } - - destroy (cb) { - if (!this.ready) { - this.once('ready', this.destroy.bind(this, cb)) - return - } - - if (this._destroyed) { - throw new Error('destroyed called twice!') - } - - this._destroyed = true - - const finish = cb || noop - - this.asyncPersistence.destroy() - .finally(finish) // swallow err in case of failure - } - - outgoingEnqueue (sub, packet, cb) { - if (!this.ready) { - this.once('ready', this.outgoingEnqueue.bind(this, sub, packet, cb)) - return - } - this.asyncPersistence.outgoingEnqueue(sub, packet) - .then(() => process.nextTick(cb, null, packet)) - .catch(cb) - } - - outgoingEnqueueCombi (subs, packet, cb) { - if (!this.ready) { - this.once('ready', this.outgoingEnqueueCombi.bind(this, subs, packet, cb)) - return - } - this.asyncPersistence.outgoingEnqueueCombi(subs, packet) - .then(() => process.nextTick(cb, null, packet)) - .catch(cb) - } - - outgoingStream (client) { - return Readable.from(this.asyncPersistence.outgoingStream(client)) - } - - outgoingUpdate (client, packet, cb) { - if (!this.ready) { - this.once('ready', this.outgoingUpdate.bind(this, client, packet, cb)) - return - } - this.asyncPersistence.outgoingUpdate(client, packet) - .then(() => cb(null, client, packet)) - .catch(cb) - } - - outgoingClearMessageId (client, packet, cb) { - if (!this.ready) { - this.once('ready', this.outgoingClearMessageId.bind(this, client, packet, cb)) - return - } - this.asyncPersistence.outgoingClearMessageId(client, packet) - .then((packet) => cb(null, packet)) - .catch(cb) - } - - incomingStorePacket (client, packet, cb) { - if (!this.ready) { - this.once('ready', this.incomingStorePacket.bind(this, client, packet, cb)) - return - } - this.asyncPersistence.incomingStorePacket(client, packet) - .then(() => cb(null)) - .catch(cb) - } - - incomingGetPacket (client, packet, cb) { - if (!this.ready) { - this.once('ready', this.incomingGetPacket.bind(this, client, packet, cb)) - return - } - this.asyncPersistence.incomingGetPacket(client, packet) - .then((packet) => cb(null, packet, client)) - .catch(cb) - } - - incomingDelPacket (client, packet, cb) { - if (!this.ready) { - this.once('ready', this.incomingDelPacket.bind(this, client, packet, cb)) - return - } - this.asyncPersistence.incomingDelPacket(client, packet) - .then(() => cb(null)) - .catch(cb) - } - - putWill (client, packet, cb) { - if (!this.ready) { - this.once('ready', this.putWill.bind(this, client, packet, cb)) - return - } - this.asyncPersistence.putWill(client, packet) - .then(() => cb(null, client)) - .catch(cb) - } - - getWill (client, cb) { - this.asyncPersistence.getWill(client) - .then((packet) => cb(null, packet, client)) - .catch(cb) - } - - delWill (client, cb) { - this.asyncPersistence.delWill(client) - .then((packet) => cb(null, packet, client)) - .catch(cb) - } - - streamWill (brokers) { - return Readable.from(this.asyncPersistence.streamWill(brokers)) - } - - getClientList (topic) { - return Readable.from(this.asyncPersistence.getClientList(topic)) - } -} - -function noop () {} - -module.exports = (opts) => new RedisPersistence(opts) +const { CallBackPersistence } = require('aedes-cached-persistence/callBackPersistence.js') +const AsyncPersistence = require('./asyncPersistence.js') +const asyncInstanceFactory = (opts) => new AsyncPersistence(opts) +module.exports = (opts) => new CallBackPersistence(asyncInstanceFactory, opts) diff --git a/test/abs.js b/test/abs.js index 02be5ef..f22ffbd 100644 --- a/test/abs.js +++ b/test/abs.js @@ -3,20 +3,12 @@ const persistence = require('../persistence.js') const Redis = require('ioredis') const mqemitterRedis = require('mqemitter-redis') const abs = require('aedes-cached-persistence/abstract') +const { once } = require('node:events') function sleep (sec) { return new Promise(resolve => setTimeout(resolve, sec * 1000)) } -function waitForEvent (obj, resolveEvt) { - return new Promise((resolve, reject) => { - obj.once(resolveEvt, () => { - resolve() - }) - obj.once('error', reject) - }) -} - function setUpPersistence (t, id, persistenceOpts) { const emitter = mqemitterRedis() const instance = persistence(persistenceOpts) @@ -45,7 +37,7 @@ function unref () { async function createDB () { const db = new Redis() - await waitForEvent(db, 'connect') + await once(db, 'connect') await db.flushall() return db } @@ -86,7 +78,7 @@ async function doTest () { const p = setUpPersistence(t, '1', { conn: externalRedis }) - await waitForEvent(p.instance, 'ready') + await once(p.instance, 'ready') t.assert.ok(true, 'instance ready') t.diagnostic('instance ready') externalRedis.disconnect() diff --git a/test/cluster.js b/test/cluster.js index d11624a..c38893c 100644 --- a/test/cluster.js +++ b/test/cluster.js @@ -3,6 +3,7 @@ const persistence = require('../persistence.js') const Redis = require('ioredis') const mqemitterRedis = require('mqemitter-redis') const abs = require('aedes-cached-persistence/abstract') +const { once } = require('node:events') const nodes = [ { host: 'localhost', port: 6378 }, @@ -17,15 +18,6 @@ function sleep (sec) { return new Promise(resolve => setTimeout(resolve, sec * 1000)) } -function waitForEvent (obj, resolveEvt) { - return new Promise((resolve, reject) => { - obj.once(resolveEvt, () => { - resolve() - }) - obj.once('error', reject) - }) -} - function setUpPersistence (t, id, persistenceOpts) { const emitter = mqemitterRedis() const instance = persistence(persistenceOpts) @@ -54,7 +46,7 @@ function unref () { async function createDB () { const db = new Redis.Cluster(nodes) - await waitForEvent(db, 'connect') + await once(db, 'connect') const dbNodes = db.nodes('master') await Promise.all(dbNodes.map(node => { return node.flushdb() })) return db @@ -97,7 +89,7 @@ async function doTest () { const p = setUpPersistence(t, '1', { conn: externalRedis }) - await waitForEvent(p.instance, 'ready') + await once(p.instance, 'ready') t.assert.ok(true, 'instance ready') t.diagnostic('instance ready') externalRedis.disconnect() @@ -112,6 +104,6 @@ async function doTest () { waitForReady: true }) // make sure everything cleans up nicely - await sleep(2) + await sleep(4) } doTest() diff --git a/test/own.js b/test/own.js index f1fa3d0..af436ca 100644 --- a/test/own.js +++ b/test/own.js @@ -2,93 +2,24 @@ const test = require('node:test') const persistence = require('../persistence.js') const Redis = require('ioredis') const mqemitterRedis = require('mqemitter-redis') - -// promisified versions of the instance methods -// to avoid deep callbacks while testing - -async function outgoingEnqueueCombi (instance, subs, packet) { - return new Promise((resolve, reject) => { - instance.outgoingEnqueueCombi(subs, packet, err => { - if (err) { - reject(err) - } else { - resolve() - } - }) - }) -} - -async function outgoingUpdate (instance, client, packet) { - return new Promise((resolve, reject) => { - instance.outgoingUpdate(client, packet, (err, reclient, repacket) => { - if (err) { - reject(err) - } else { - resolve({ reclient, repacket }) - } - }) - }) -} - -async function subscriptionsByTopic (instance, topic) { - return new Promise((resolve, reject) => { - instance.subscriptionsByTopic(topic, (err, resubs) => { - if (err) { - reject(err) - } else { - resolve(resubs) - } - }) - }) -} - -async function addSubscriptions (instance, client, subs) { - return new Promise((resolve, reject) => { - instance.addSubscriptions(client, subs, (err, reClient) => { - if (err) { - reject(err) - } else { - resolve(reClient) - } - }) - }) -} - -async function putWill (instance, client, packet) { - return new Promise((resolve, reject) => { - instance.putWill(client, packet, (err, reClient) => { - if (err) { - reject(err) - } else { - resolve(reClient) - } - }) - }) -} +const { PromisifiedPersistence } = require('aedes-cached-persistence/promisified.js') +const { once } = require('node:events') // helpers function sleep (sec) { return new Promise(resolve => setTimeout(resolve, sec * 1000)) } -function waitForEvent (obj, resolveEvt) { - return new Promise((resolve, reject) => { - obj.once(resolveEvt, () => { - resolve() - }) - obj.once('error', reject) - }) -} - async function setUpPersistence (t, id, persistenceOpts) { const emitter = mqemitterRedis() const instance = persistence(persistenceOpts) instance.broker = toBroker(id, emitter) if (!instance.ready) { - await waitForEvent(instance, 'ready') + await once(instance, 'ready') } t.diagnostic(`instance ${id} created`) - return { instance, emitter, id } + const p = new PromisifiedPersistence(instance) + return { instance: p, emitter, id } } function cleanUpPersistence (t, { instance, emitter, id }) { @@ -108,7 +39,7 @@ function toBroker (id, emitter) { async function createDB () { const db = new Redis() - await waitForEvent(db, 'connect') + await once(db, 'connect') await db.flushall() return db } @@ -157,7 +88,7 @@ async function doTest () { brokerId: instance.broker.id, brokerCounter: 42 } - await outgoingEnqueueCombi(instance, subs, packet) + await instance.outgoingEnqueueCombi(subs, packet) await sleep(2) const packets = await instance.outgoingStream({ id: 'ttlTest' }).toArray() const noPacket = (packets === undefined) || (packets[0] === undefined) @@ -194,8 +125,8 @@ async function doTest () { messageId: 123 } - await outgoingEnqueueCombi(instance, subs, packet) - await outgoingUpdate(instance, client, packet) + await instance.outgoingEnqueueCombi(subs, packet) + await instance.outgoingUpdate(client, packet) await sleep(2) const exists = await db.exists('packet:1:42') t.assert.ok(!exists, 'packet key should have expired') @@ -239,9 +170,9 @@ async function doTest () { nl: undefined }] - await addSubscriptions(instance, client, subs) + await instance.addSubscriptions(client, subs) await sleep(2) - const resubs = await subscriptionsByTopic(instance2, 'hello') + const resubs = await instance2.subscriptionsByTopic('hello') t.assert.deepEqual(resubs, expected, 'received correct subs') cleanUpPersistence(t, p1) @@ -265,7 +196,7 @@ async function doTest () { } try { - await outgoingUpdate(instance, client, packet) + await instance.outgoingUpdate(client, packet) } catch (err) { t.assert.ok(err, 'error received') t.assert.equal(err.message, 'unknown key', 'Received unknown PUBREC') @@ -292,8 +223,8 @@ async function doTest () { messageId: 123 } - await putWill(instance, client, packet) - await putWill(instance, client, packet) + await instance.putWill(client, packet) + await instance.putWill(client, packet) const wills = await instance.streamWill().toArray() t.assert.equal(wills.length, 1, 'should only be one will') cleanUpPersistence(t, p)