Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 49 additions & 58 deletions aedes.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
import EventEmitter from 'node:events'
import parallel from 'fastparallel'
import series from 'fastseries'
import { v4 as uuidv4 } from 'uuid'
import reusify from 'reusify'
import { pipeline } from 'stream'
import { randomUUID } from 'node:crypto'
import { promisify } from 'node:util'
import Packet from 'aedes-packet'
import memory from 'aedes-persistence'
import mqemitter from 'mqemitter'
import Client from './lib/client.js'
import { $SYS_PREFIX, bulk } from './lib/utils.js'
import { $SYS_PREFIX, batch, noop, runSeries } from './lib/utils.js'
import pkg from './package.json' with { type: 'json' }

const defaultOptions = {
Expand Down Expand Up @@ -38,7 +35,7 @@ export class Aedes extends EventEmitter {

opts = Object.assign({}, defaultOptions, opts)
this.opts = opts
this.id = opts.id || uuidv4()
this.id = opts.id || randomUUID()
// +1 when construct a new aedes-packet
// internal track for last brokerCounter
this.counter = 0
Expand All @@ -57,9 +54,7 @@ export class Aedes extends EventEmitter {
return new Client(that, conn, req)
}

this._parallel = parallel()
this._series = series()
this._enqueuers = reusify(DoEnqueues)
this._series = runSeries

this.preConnect = opts.preConnect
this.authenticate = opts.authenticate
Expand Down Expand Up @@ -112,51 +107,45 @@ export class Aedes extends EventEmitter {
}, noop)
}

function deleteOldBrokers (broker) {
if (that.brokers[broker] + (3 * opts.heartbeatInterval) < Date.now()) {
delete that.brokers[broker]
}
}
async function _clearWills () {
const pAuthorizePublish = promisify(that.authorizePublish).bind(that)
const pPublish = promisify(that.publish).bind(that)
const batchSize = 16 // default highWatermark for Writable in ObjectMode

this._clearWillInterval = setInterval(function () {
Object.keys(that.brokers).forEach(deleteOldBrokers)
async function checkAndPublish (will) {
const notPublish = that.brokers[will.brokerId] !== undefined && that.brokers[will.brokerId] + (3 * opts.heartbeatInterval) >= Date.now()
if (notPublish) {
return
}
// randomize this, so that multiple brokers
// do not publish the same wills at the same time
const client = that.clients[will.clientId] || null
await pAuthorizePublish(client, will)
await pPublish(will)
await that.persistence.delWill({
id: will.clientId,
brokerId: will.brokerId
})
}

pipeline(
that.persistence.streamWill(that.brokers),
bulk(receiveWills),
function done (err) {
if (err) {
that.emit('error', err)
}
// delete old brokers
for (const broker in that.brokers) {
if (that.brokers[broker] + (3 * opts.heartbeatInterval) < Date.now()) {
delete that.brokers[broker]
}
)
}, opts.heartbeatInterval * 4)
}

function receiveWills (chunks, done) {
that._parallel(that, checkAndPublish, chunks, done)
const wills = that.persistence.streamWill(that.brokers)
for await (const promises of batch(wills, checkAndPublish, batchSize)) {
await Promise.all(promises)
}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is by far not equivalent. It also creates a bazillion of promises and it will significantly impact performance

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, the current solution is rather inefficient.
I'm working on a more elegant solution for this one.

}

function checkAndPublish (will, done) {
const notPublish = that.brokers[will.brokerId] !== undefined && that.brokers[will.brokerId] + (3 * opts.heartbeatInterval) >= Date.now()

if (notPublish) return done()

// randomize this, so that multiple brokers
// do not publish the same wills at the same time
this.authorizePublish(that.clients[will.clientId] || null, will, function (err) {
if (err) { return doneWill() }
that.publish(will, doneWill)

function doneWill (err) {
if (err) { return done(err) }
that.persistence.delWill({
id: will.clientId,
brokerId: will.brokerId
}).then(will => done(undefined, will), done)
}
this._clearWillInterval = setInterval(() => {
_clearWills().catch(err => {
that.emit('error', err)
})
}

}, opts.heartbeatInterval * 4)
this.mq.on($SYS_PREFIX + '+/heartbeat', function storeBroker (packet, done) {
that.brokers[packet.payload.toString()] = Date.now()
done()
Expand Down Expand Up @@ -266,11 +255,14 @@ export class Aedes extends EventEmitter {
this.closed = true
clearInterval(this._heartbeatInterval)
clearInterval(this._clearWillInterval)
this._parallel(this, closeClient, Object.keys(this.clients), doneClose)
function doneClose () {
const promises = []
for (const clientId of Object.keys(this.clients)) {
promises.push(closeClient(this.clients[clientId]))
Comment thread
seriousme marked this conversation as resolved.
}
Promise.all(promises).finally(() => {
that.emit('closed')
that.mq.close(cb)
}
})
}
}

Expand All @@ -289,7 +281,7 @@ function emitPacket (packet, done) {
}

function enqueueOffline (packet, done) {
const enqueuer = this.broker._enqueuers.get()
const enqueuer = new DoEnqueues()

enqueuer.complete = done
enqueuer.packet = packet
Expand Down Expand Up @@ -333,7 +325,6 @@ class DoEnqueues {

broker.persistence.outgoingEnqueueCombi(subs, packet)
.then(() => complete(null), complete)
broker._enqueuers.release(that)
}
}
}
Expand Down Expand Up @@ -362,8 +353,10 @@ const publishFuncsQoS = [
callPublished
]

function closeClient (client, cb) {
this.clients[client].close(cb)
async function closeClient (clientInstance) {
return new Promise((resolve) => {
clientInstance.close(resolve)
})
}

function defaultPreConnect (client, packet, callback) {
Expand Down Expand Up @@ -401,11 +394,9 @@ class PublishState {
}
}

function noop () {}

function warnMigrate () {
throw new Error(
` Aedes default export has been removed.
` Aedes default export has been removed.
Use 'const aedes = await Aedes.createBroker()' instead.
See: https://github.com/moscajs/aedes/docs/MIGRATION.MD
`)
Expand Down
2 changes: 1 addition & 1 deletion docs/Client.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ a read-only flag indicates if client is closed or not.

## client.id

- `<string>` __Default__: `aedes_${hyperid()}`
- `<string>` __Default__: `aedes_${randomUUID()}`

Client unique identifier, specified by CONNECT packet.

Expand Down
7 changes: 3 additions & 4 deletions lib/client.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import mqtt from 'mqtt-packet'
import EventEmitter from 'node:events'
import util from 'util'
import eos from 'end-of-stream'
import Packet from 'aedes-packet'
import write from './write.js'
import QoSPacket from './qos-packet.js'
import handleSubscribe from './handlers/subscribe.js'
import handleUnsubscribe from './handlers/unsubscribe.js'
import handle from './handlers/index.js'
import { pipeline } from 'stream'
import { pipeline, finished } from 'stream'
Comment thread
seriousme marked this conversation as resolved.
import { through } from './utils.js'

class Client {
Expand Down Expand Up @@ -88,7 +87,7 @@ class Client {
this._parser.on('error', this.emit.bind(this, 'error'))

conn.on('end', this.close.bind(this))
this._eos = eos(this.conn, this.close.bind(this))
this._eos = finished(this.conn, this.close.bind(this))

const getToForwardPacket = (_packet) => {
// Mqttv5 3.8.3.1: https://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html#_Toc3901169
Expand Down Expand Up @@ -320,7 +319,7 @@ class Client {
this._parser._queue = null

if (this._keepaliveTimer) {
this._keepaliveTimer.clear()
clearTimeout(this._keepaliveTimer)
this._keepaliveInterval = -1
this._keepaliveTimer = null
}
Expand Down
17 changes: 8 additions & 9 deletions lib/handlers/connect.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
import retimer from 'retimer'
import { randomUUID } from 'node:crypto'
import { pipeline } from 'stream'
import write from '../write.js'
import QoSPacket from '../qos-packet.js'
import { through } from '../utils.js'
import handleSubscribe from './subscribe.js'
import hyperid from 'hyperid'

const uniqueId = hyperid()

class Connack {
constructor (arg) {
Expand Down Expand Up @@ -84,7 +81,7 @@ function init (client, packet, done) {
return
}

client.id = clientId || 'aedes_' + uniqueId()
client.id = clientId || 'aedes_' + randomUUID()
Comment thread
seriousme marked this conversation as resolved.
client.clean = packet.clean
client.version = packet.protocolVersion
client._will = packet.will
Expand Down Expand Up @@ -150,12 +147,14 @@ function authenticate (arg, done) {
function setKeepAlive (arg, done) {
if (this.packet.keepalive > 0) {
const client = this.client
// [MQTT-3.1.2-24]
client._keepaliveInterval = (this.packet.keepalive * 1500) + 1
client._keepaliveTimer = retimer(function keepaliveTimeout () {

function keepaliveTimeout () {
client.broker.emit('keepaliveTimeout', client)
client.emit('error', new Error('keep alive timeout'))
}, client._keepaliveInterval)
}
// [MQTT-3.1.2-24]
client._keepaliveInterval = (this.packet.keepalive * 1500) + 1
client._keepaliveTimer = setTimeout(keepaliveTimeout, client._keepaliveInterval)
}
done()
}
Expand Down
2 changes: 1 addition & 1 deletion lib/handlers/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ function handle (client, packet, done) {
}

if (client._keepaliveInterval > 0) {
client._keepaliveTimer.reschedule(client._keepaliveInterval)
client._keepaliveTimer.refresh(client._keepaliveInterval)
}
}

Expand Down
49 changes: 28 additions & 21 deletions lib/handlers/subscribe.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import fastfall from 'fastfall'
import Packet from 'aedes-packet'
import { through, validateTopic, $SYS_PREFIX } from '../utils.js'
import { through, validateTopic, $SYS_PREFIX, runFall } from '../utils.js'
import write from '../write.js'

const subscribeTopicActions = fastfall([
const subscribeTopicActions = runFall([
authorize,
storeSubscriptions,
addSubs
])
const restoreTopicActions = fastfall([
const restoreTopicActions = runFall([
authorize,
addSubs
])
Expand Down Expand Up @@ -78,20 +77,30 @@ function _dedupe (subs) {
return ret
}

function handleSubscribe (client, packet, restore, done) {
async function handleSubscribe (client, packet, restore, done) {
packet.subscriptions = packet.subscriptions.length === 1 ? packet.subscriptions : _dedupe(packet.subscriptions)
client.broker._parallel(
new SubscribeState(client, packet, restore, done), // what will be this in the functions
doSubscribe, // function to call
packet.subscriptions, // first argument of the function
restore ? done : completeSubscribe // the function to be called when the parallel ends
)
const state = new SubscribeState(client, packet, restore, done)
try {
await Promise.all(packet.subscriptions.map(sub => doSubscribe(state, sub)))
if (restore) {
done()
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that if this throws, it would call done twice

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see why, can you please explain?

I tried the following:

const pOk = new Promise(resolve => {
    console.log('pOK will resolve')
    resolve()
    })

const pFail = new Promise((resolve,reject) => {
    console.log('pFail will reject')
    reject('pFail rejected')
    })

try {
    await Promise.all([pOk,pFail])
    console.log('all Promises resolved ok')
} catch (err){
    console.log('At least one promise failed, first error:', err)
}

This gives me:

pOK will resolve
pFail will reject
At least one promise failed, first error: pFail rejected

Btw: I reworked doSubscribe to make this part more easy to read.

} else {
completeSubscribe.call(state)
}
} catch (err) {
done(err)
}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has the same problem with promises identified elsewhere

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has the same problem with promises identified elsewhere

}

function doSubscribe (sub, done) {
const s = new SubState(this.client, this.packet, sub.qos, sub.rh, sub.rap, sub.nl)
this.subState.push(s)
this.actions.call(s, sub, done)
async function doSubscribe (state, sub) {
return new Promise((resolve, reject) => {
const s = new SubState(state.client, state.packet, sub.qos, sub.rh, sub.rap, sub.nl)
state.subState.push(s)
state.actions.call(s, sub, (err, result) => {
if (err) reject(err)
else resolve(result)
})
})
}

function authorize (sub, done) {
Expand Down Expand Up @@ -159,12 +168,15 @@ function addSubs (sub, done) {
func = blockDollarSignTopics(func)
}

/* c8 ignore start */
if (client.closed || client.broker.closed) {
// a hack, sometimes client.close() or broker.close() happened
// before authenticate() comes back
// we don't continue subscription here
// since it is a rare race condition we ignore it in coverage testing
return
}
/* c8 ignore stop */
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was this tested before?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not that I know, the coverage testing shows that this part is never reached and given the comments it looked like a rare race condition to me.


if (!client.subscriptions[topic]) {
client.subscriptions[topic] = new Subscription(qos, func, rh, rap, nl)
Expand All @@ -185,13 +197,8 @@ function isStartsWithWildcard (topic) {
return code === 43 || code === 35
}

function completeSubscribe (err) {
function completeSubscribe () {
const done = this.finish

if (err) {
return done(err)
}

const packet = this.packet
const client = this.client

Expand Down
Loading
Loading