Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 35 additions & 53 deletions aedes.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import EventEmitter from 'node:events'
import { randomUUID } from 'node:crypto'
import { pipeline } from 'stream'
import { promisify } from 'node:util'
import Packet from 'aedes-packet'
import memory from 'aedes-persistence'
import mqemitter from 'mqemitter'
import Client from './lib/client.js'
import { $SYS_PREFIX, bulk, ObjectPool } from './lib/utils.js'
import { $SYS_PREFIX, batch } from './lib/utils.js'
import pkg from './package.json' with { type: 'json' }

const defaultOptions = {
Expand Down Expand Up @@ -54,7 +54,6 @@ export class Aedes extends EventEmitter {
}

this._series = runSeries
this._enqueuers = new ObjectPool(DoEnqueues)

this.preConnect = opts.preConnect
this.authenticate = opts.authenticate
Expand Down Expand Up @@ -107,61 +106,45 @@ export class Aedes extends EventEmitter {
}, noop)
}

function deleteOldBrokers (broker) {
if (that.brokers[broker] + (3 * opts.heartbeatInterval) < Date.now()) {
delete that.brokers[broker]
}
}
async function _clearWills () {
const pAuthorizePublish = promisify(that.authorizePublish).bind(that)
const pPublish = promisify(that.publish).bind(that)
const batchSize = 16 // default highWatermark for Writable in ObjectMode

this._clearWillInterval = setInterval(function () {
Object.keys(that.brokers).forEach(deleteOldBrokers)
async function checkAndPublish (will) {
const notPublish = that.brokers[will.brokerId] !== undefined && that.brokers[will.brokerId] + (3 * opts.heartbeatInterval) >= Date.now()
if (notPublish) {
return
}
// randomize this, so that multiple brokers
// do not publish the same wills at the same time
const client = that.clients[will.clientId] || null
await pAuthorizePublish(client, will)
await pPublish(will)
await that.persistence.delWill({
id: will.clientId,
brokerId: will.brokerId
})
}

pipeline(
that.persistence.streamWill(that.brokers),
bulk(receiveWills),
function done (err) {
if (err) {
that.emit('error', err)
}
// delete old brokers
for (const broker in that.brokers) {
if (that.brokers[broker] + (3 * opts.heartbeatInterval) < Date.now()) {
delete that.brokers[broker]
}
)
}, opts.heartbeatInterval * 4)
}

async function receiveWills (chunks, done) {
try {
await Promise.all(chunks.map(chunk => new Promise((resolve, reject) => {
checkAndPublish(chunk, (err) => {
if (err) reject(err)
else resolve()
})
})))
done()
} catch (err) {
done(err)
const wills = that.persistence.streamWill(that.brokers)
for await (const promises of batch(wills, checkAndPublish, batchSize)) {
await Promise.all(promises)
}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is by far not equivalent. It also creates a bazillion of promises and it will significantly impact performance

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, the current solution is rather inefficient.
I'm working on a more elegant solution for this one.

}

function checkAndPublish (will, done) {
const notPublish = that.brokers[will.brokerId] !== undefined && that.brokers[will.brokerId] + (3 * opts.heartbeatInterval) >= Date.now()

if (notPublish) return done()

// randomize this, so that multiple brokers
// do not publish the same wills at the same time
that.authorizePublish(that.clients[will.clientId] || null, will, function (err) {
if (err) { return doneWill() }
that.publish(will, doneWill)

function doneWill (err) {
if (err) { return done(err) }
that.persistence.delWill({
id: will.clientId,
brokerId: will.brokerId
}).then(will => done(undefined, will), done)
}
this._clearWillInterval = setInterval(() => {
_clearWills().catch(err => {
that.emit('error', err)
})
}

}, opts.heartbeatInterval * 4)
this.mq.on($SYS_PREFIX + '+/heartbeat', function storeBroker (packet, done) {
that.brokers[packet.payload.toString()] = Date.now()
done()
Expand Down Expand Up @@ -275,7 +258,7 @@ export class Aedes extends EventEmitter {
for (const clientId of Object.keys(this.clients)) {
promises.push(closeClient(this.clients[clientId]))
Comment thread
seriousme marked this conversation as resolved.
}
Promise.all(promises).then(() => {
Promise.all(promises).finally(() => {
that.emit('closed')
that.mq.close(cb)
})
Expand All @@ -297,7 +280,7 @@ function emitPacket (packet, done) {
}

function enqueueOffline (packet, done) {
const enqueuer = this.broker._enqueuers.get()
const enqueuer = new DoEnqueues()

enqueuer.complete = done
enqueuer.packet = packet
Expand Down Expand Up @@ -341,7 +324,6 @@ class DoEnqueues {

broker.persistence.outgoingEnqueueCombi(subs, packet)
.then(() => complete(null), complete)
broker._enqueuers.release(that)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ class Client {
this._parser._queue = null

if (this._keepaliveTimer) {
this._keepaliveTimer.clear()
clearTimeout(this._keepaliveTimer)
this._keepaliveInterval = -1
this._keepaliveTimer = null
}
Expand Down
12 changes: 7 additions & 5 deletions lib/handlers/connect.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { randomUUID } from 'node:crypto'
import { pipeline } from 'stream'
import write from '../write.js'
import QoSPacket from '../qos-packet.js'
import { through, retimer } from '../utils.js'
import { through } from '../utils.js'
import handleSubscribe from './subscribe.js'

class Connack {
Expand Down Expand Up @@ -147,12 +147,14 @@ function authenticate (arg, done) {
function setKeepAlive (arg, done) {
if (this.packet.keepalive > 0) {
const client = this.client
// [MQTT-3.1.2-24]
client._keepaliveInterval = (this.packet.keepalive * 1500) + 1
client._keepaliveTimer = retimer(function keepaliveTimeout () {

function keepaliveTimeout () {
client.broker.emit('keepaliveTimeout', client)
client.emit('error', new Error('keep alive timeout'))
}, client._keepaliveInterval)
}
// [MQTT-3.1.2-24]
client._keepaliveInterval = (this.packet.keepalive * 1500) + 1
client._keepaliveTimer = setTimeout(keepaliveTimeout, client._keepaliveInterval)
}
done()
}
Expand Down
2 changes: 1 addition & 1 deletion lib/handlers/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ function handle (client, packet, done) {
}

if (client._keepaliveInterval > 0) {
client._keepaliveTimer.reschedule(client._keepaliveInterval)
client._keepaliveTimer.refresh(client._keepaliveInterval)
}
}

Expand Down
51 changes: 15 additions & 36 deletions lib/handlers/subscribe.js
Original file line number Diff line number Diff line change
@@ -1,26 +1,7 @@
import Packet from 'aedes-packet'
import { through, validateTopic, $SYS_PREFIX } from '../utils.js'
import { through, validateTopic, $SYS_PREFIX, runFall } from '../utils.js'
import write from '../write.js'

function runFall (fns) {
// run functions in fastfall style, only need the single argument function
return function (arg, cb) {
let i = 0
const ctx = this
function next (err, nextarg) {
if (err || i === fns.length) {
if (typeof cb === 'function') {
cb.call(ctx, err, nextarg)
}
return
}
const fn = fns[i++]
fn.call(ctx, nextarg, next)
}
next(null, arg)
}
}

const subscribeTopicActions = runFall([
authorize,
storeSubscriptions,
Expand Down Expand Up @@ -100,12 +81,7 @@ async function handleSubscribe (client, packet, restore, done) {
packet.subscriptions = packet.subscriptions.length === 1 ? packet.subscriptions : _dedupe(packet.subscriptions)
const state = new SubscribeState(client, packet, restore, done)
try {
await Promise.all(packet.subscriptions.map(sub => new Promise((resolve, reject) => {
doSubscribe.call(state, sub, (err, result) => {
if (err) reject(err)
else resolve(result)
})
})))
await Promise.all(packet.subscriptions.map(sub => doSubscribe(state, sub)))
if (restore) {
done()
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that if this throws, it would call done twice

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see why, can you please explain?

I tried the following:

const pOk = new Promise(resolve => {
    console.log('pOK will resolve')
    resolve()
    })

const pFail = new Promise((resolve,reject) => {
    console.log('pFail will reject')
    reject('pFail rejected')
    })

try {
    await Promise.all([pOk,pFail])
    console.log('all Promises resolved ok')
} catch (err){
    console.log('At least one promise failed, first error:', err)
}

This gives me:

pOK will resolve
pFail will reject
At least one promise failed, first error: pFail rejected

Btw: I reworked doSubscribe to make this part more easy to read.

} else {
Expand All @@ -116,10 +92,15 @@ async function handleSubscribe (client, packet, restore, done) {
}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has the same problem with promises identified elsewhere

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has the same problem with promises identified elsewhere

}

function doSubscribe (sub, done) {
const s = new SubState(this.client, this.packet, sub.qos, sub.rh, sub.rap, sub.nl)
this.subState.push(s)
this.actions.call(s, sub, done)
async function doSubscribe (state, sub) {
return new Promise((resolve, reject) => {
const s = new SubState(state.client, state.packet, sub.qos, sub.rh, sub.rap, sub.nl)
state.subState.push(s)
state.actions.call(s, sub, (err, result) => {
if (err) reject(err)
else resolve(result)
})
})
}

function authorize (sub, done) {
Expand Down Expand Up @@ -187,12 +168,15 @@ function addSubs (sub, done) {
func = blockDollarSignTopics(func)
}

/* c8 ignore start */
if (client.closed || client.broker.closed) {
// a hack, sometimes client.close() or broker.close() happened
// before authenticate() comes back
// we don't continue subscription here
// since it is a rare race condition we ignore it in coverage testing
return
}
/* c8 ignore stop */
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was this tested before?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not that I know, the coverage testing shows that this part is never reached and given the comments it looked like a rare race condition to me.


if (!client.subscriptions[topic]) {
client.subscriptions[topic] = new Subscription(qos, func, rh, rap, nl)
Expand All @@ -213,13 +197,8 @@ function isStartsWithWildcard (topic) {
return code === 43 || code === 35
}

function completeSubscribe (err) {
function completeSubscribe () {
const done = this.finish

if (err) {
return done(err)
}

const packet = this.packet
const client = this.client

Expand Down
42 changes: 21 additions & 21 deletions lib/handlers/unsubscribe.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,33 +43,33 @@ function handleUnsubscribe (client, packet, done) {
async function actualUnsubscribe (client, packet, done) {
const state = new UnsubscribeState(client, packet, done)
try {
await Promise.all(packet.unsubscriptions.map(sub => new Promise((resolve, reject) => {
doUnsubscribe.call(state, sub, (err, result) => {
if (err) reject(err)
else resolve(result)
})
})))
await Promise.all(packet.unsubscriptions.map(sub => doUnsubscribe(state, sub)))
completeUnsubscribe.call(state)
} catch (err) {
completeUnsubscribe.call(state, err)
}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same problem

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same discussion as doSubscribe(), I also reworked doUnsubscribe to make the code more readable

}

function doUnsubscribe (sub, done) {
const client = this.client
const broker = client.broker
const s = client.subscriptions[sub]

if (s) {
const func = s.func
delete client.subscriptions[sub]
broker.unsubscribe(
sub,
func,
done)
} else {
done()
}
async function doUnsubscribe (state, sub) {
return new Promise((resolve, reject) => {
const client = state.client
const broker = client.broker
const s = client.subscriptions[sub]

if (s) {
const func = s.func
delete client.subscriptions[sub]
broker.unsubscribe(
sub,
func,
(err) => {
if (err) reject(err)
else resolve()
})
} else {
resolve()
}
})
}

function completeUnsubscribe (err) {
Expand Down
Loading
Loading