Skip to content

Commit 3bb602c

Browse files
authored
Merge pull request #38 from behrad/new_cached_persistence_interface
based on cached-persistence 5.0.1
2 parents 7eb6c4f + f50ef0f commit 3bb602c

2 files changed

Lines changed: 12 additions & 23 deletions

File tree

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
"tape": "^4.7.0"
3333
},
3434
"dependencies": {
35-
"aedes-cached-persistence": "^4.0.0",
35+
"aedes-cached-persistence": "^5.0.1",
3636
"from2": "^2.3.0",
3737
"ioredis": "^3.0.0",
3838
"msgpack-lite": "^0.1.20",

persistence.js

Lines changed: 11 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -109,30 +109,28 @@ RedisPersistence.prototype.addSubscriptions = function (client, subs, cb) {
109109

110110
var toStore = {}
111111
var offlines = []
112-
var count = 0
113112
var published = 0
114-
var errored = null
113+
var errored
115114

116115
for (var i = 0; i < subs.length; i++) {
117116
var sub = subs[i]
118117
toStore[sub.topic] = sub.qos
119118
if (sub.qos > 0) {
120119
offlines.push(sub.topic)
121-
count++
122-
this._waitFor(client, sub.topic, finish)
123120
}
124121
}
125122

126123
this._db.sadd(subsKey, offlines, noop)
127124
this._db.sadd(clientsKey, client.id, noop)
128125
this._db.hmset(clientSubKey, toStore, finish)
129126

130-
this._addedSubscriptions(client, subs)
127+
this._addedSubscriptions(client, subs, finish)
131128

132-
function finish () {
129+
function finish (err) {
130+
errored = err
133131
published++
134-
if (published === count + 1 && !errored) {
135-
cb(null, client)
132+
if (published === 2) {
133+
cb(errored, client)
136134
}
137135
}
138136
}
@@ -145,17 +143,13 @@ RedisPersistence.prototype.removeSubscriptions = function (client, subs, cb) {
145143

146144
var clientSubKey = clientKey + client.id
147145

148-
var published = 0
149-
var count = 0
150146
var removableTopics = []
151147
var errored = false
152148

153149
for (var i = 0; i < subs.length; i++) {
154-
this._waitFor(client, subs[i], finish)
155150
if (this._matcher.match(subs[i]).length === 1) {
156151
removableTopics.push(subs[i])
157152
}
158-
count++
159153
}
160154

161155
var that = this
@@ -179,17 +173,12 @@ RedisPersistence.prototype.removeSubscriptions = function (client, subs, cb) {
179173
}
180174
})
181175

182-
that._removedSubscriptions(client, subs.map(toSub))
183-
184-
finish()
176+
that._removedSubscriptions(client, subs.map(toSub), function () {
177+
if (!errored) {
178+
cb(null, client)
179+
}
180+
})
185181
})
186-
187-
function finish () {
188-
published++
189-
if (published === count + 1 && !errored) {
190-
cb(null, client)
191-
}
192-
}
193182
}
194183

195184
function toSub (topic) {

0 commit comments

Comments
 (0)