Skip to content

Commit bf878ed

Browse files
authored
fix: ZMS 226 (#817)
* Use async for mailbox folder cache operations instead of doing it in the background * Promise fix * Treat negative key as no counter value
1 parent 76f79fd commit bf878ed

7 files changed

Lines changed: 63 additions & 63 deletions

File tree

lib/api/mailboxes.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ const { GetMailboxesResult } = require('../schemas/response/mailboxes-schemas');
1313
const { MAX_MAILBOX_NAME_LENGTH, MAX_SUB_MAILBOXES } = require('../consts');
1414

1515
module.exports = (db, server, mailboxHandler) => {
16-
const getMailboxCounter = util.promisify(tools.getMailboxCounter);
16+
const getMailboxCounter = tools.getMailboxCounter;
1717
const updateMailbox = util.promisify(mailboxHandler.update.bind(mailboxHandler));
1818
const deleteMailbox = util.promisify(mailboxHandler.del.bind(mailboxHandler));
1919
const createMailbox = mailboxHandler.createAsync.bind(mailboxHandler);
@@ -242,7 +242,7 @@ module.exports = (db, server, mailboxHandler) => {
242242
counterOps.push(
243243
(async () => {
244244
try {
245-
total = await getMailboxCounter(db, mailboxData._id, false);
245+
total = await getMailboxCounter(db, mailboxData._id);
246246
} catch (err) {
247247
// ignore
248248
}
@@ -514,7 +514,7 @@ module.exports = (db, server, mailboxHandler) => {
514514
let total, unseen;
515515

516516
try {
517-
total = await getMailboxCounter(db, mailboxData._id, false);
517+
total = await getMailboxCounter(db, mailboxData._id);
518518
} catch (err) {
519519
// ignore
520520
}

lib/api/messages.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ module.exports = (db, server, messageHandler, userHandler, storageHandler, setti
6060

6161
const encryptMessage = util.promisify(messageHandler.encryptMessage.bind(messageHandler));
6262

63-
const getMailboxCounter = util.promisify(tools.getMailboxCounter);
63+
const getMailboxCounter = tools.getMailboxCounter;
6464
const asyncForward = util.promisify(forward);
6565

6666
const addThreadCountersToMessageList = async (user, list) => {
@@ -3694,8 +3694,8 @@ module.exports = (db, server, messageHandler, userHandler, storageHandler, setti
36943694

36953695
async function getFilteredMessageCount(filter) {
36963696
if (Object.keys(filter).length === 1 && filter.mailbox) {
3697-
// try to use cached value to get the count
3698-
return await getMailboxCounter(db, filter.mailbox, false);
3697+
// Try to use cached value to get the count
3698+
return await getMailboxCounter(db, filter.mailbox);
36993699
}
37003700

37013701
return await db.database.collection('messages').countDocuments(filter);

lib/api/updates.js

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,13 @@ const base32 = require('base32.js');
99
const { sessSchema, sessIPSchema } = require('../schemas');
1010
const { userId } = require('../schemas/request/general-schemas');
1111

12+
const getMailboxCounterCb = (db, mailbox, type, callback) => {
13+
tools
14+
.getMailboxCounter(db, mailbox, type)
15+
.then(sum => callback(null, sum))
16+
.catch(err => callback(err));
17+
};
18+
1219
module.exports = (db, server, notifier) => {
1320
server.get(
1421
{
@@ -269,11 +276,11 @@ function loadJournalStream(db, req, res, user, lastEventId, done) {
269276
});
270277
}
271278
let mailbox = new ObjectId(mailboxes[mailboxPos++]);
272-
tools.getMailboxCounter(db, mailbox, false, (err, total) => {
279+
getMailboxCounterCb(db, mailbox, false, (err, total) => {
273280
if (err) {
274281
// ignore
275282
}
276-
tools.getMailboxCounter(db, mailbox, 'unseen', (err, unseen) => {
283+
getMailboxCounterCb(db, mailbox, 'unseen', (err, unseen) => {
277284
if (err) {
278285
// ignore
279286
}

lib/imap-notifier.js

Lines changed: 23 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ class ImapNotifier extends EventEmitter {
1414
super();
1515

1616
this.database = options.database;
17-
this.publisher = options.redis || new Redis(tools.redisConfig(config.dbs.redis));
18-
this.counters = counters(this.publisher);
17+
this.redis = options.redis || new Redis(tools.redisConfig(config.dbs.redis));
18+
this.counters = counters(this.redis);
1919

2020
this.logger = options.logger || {
2121
info: log.silly.bind(log, 'IMAP'),
@@ -210,7 +210,7 @@ class ImapNotifier extends EventEmitter {
210210
let pushToJournal = async () => {
211211
for (let entry of entries) {
212212
if (entry.command === 'EXISTS' && entry.junk) {
213-
await publish(this.publisher, {
213+
await publish(this.redis, {
214214
ev: entry.junk > 0 ? MARKED_SPAM : MARKED_HAM,
215215
user: entry.user.toString(),
216216
mailbox: entry.mailbox.toString(),
@@ -225,7 +225,11 @@ class ImapNotifier extends EventEmitter {
225225
ordered: false
226226
});
227227

228-
setImmediate(() => this.updateCounters(entries));
228+
try {
229+
await this.updateCounters(entries);
230+
} catch (err) {
231+
this.logger.error('Failed to update mailbox counters', err.message);
232+
}
229233

230234
return r.insertedCount;
231235
};
@@ -298,7 +302,7 @@ class ImapNotifier extends EventEmitter {
298302
e: user.toString(),
299303
p: payload
300304
});
301-
this.publisher.publish('wd_events', data);
305+
this.redis.publish('wd_events', data);
302306
});
303307
}
304308

@@ -322,10 +326,11 @@ class ImapNotifier extends EventEmitter {
322326
.toArray(callback);
323327
}
324328

325-
updateCounters(entries) {
329+
async updateCounters(entries) {
326330
if (!entries) {
327331
return;
328332
}
333+
329334
let counters = new Map();
330335
(Array.isArray(entries) ? entries : [].concat(entries || [])).forEach(entry => {
331336
let m = entry.mailbox.toString();
@@ -358,33 +363,24 @@ class ImapNotifier extends EventEmitter {
358363
}
359364
});
360365

361-
let pos = 0;
362-
let rows = Array.from(counters);
363-
let updateCounter = () => {
364-
if (pos >= rows.length) {
365-
return;
366-
}
367-
let row = rows[pos++];
366+
for (let row of Array.from(counters)) {
368367
if (!row || !row.length) {
369-
return updateCounter();
368+
continue;
370369
}
370+
371371
let mailbox = row[0];
372372
let delta = row[1];
373373

374-
this.counters.cachedcounter('total:' + mailbox, delta.total, consts.MAILBOX_COUNTER_TTL, () => {
375-
if (delta.unseenChange) {
376-
// Message info changed in mailbox, so just te be sure, clear the unseen counter as well
377-
// Unseen counter is more volatile and also easier to count (usually only a small number on indexed messages)
378-
this.publisher.del('unseen:' + mailbox, updateCounter);
379-
} else if (delta.unseen) {
380-
this.counters.cachedcounter('unseen:' + mailbox, delta.unseen, consts.MAILBOX_COUNTER_TTL, updateCounter);
381-
} else {
382-
setImmediate(updateCounter);
383-
}
384-
});
385-
};
374+
await this.redis.cachedcounter(`total:${mailbox}`, delta.total, consts.MAILBOX_COUNTER_TTL);
386375

387-
updateCounter();
376+
if (delta.unseenChange) {
377+
// Message info changed in mailbox, so just te be sure, clear the unseen counter as well
378+
// Unseen counter is more volatile and also easier to count (usually only a small number on indexed messages)
379+
await this.redis.del('unseen:' + mailbox);
380+
} else if (delta.unseen) {
381+
await this.redis.cachedcounter(`unseen:${mailbox}`, delta.unseen, consts.MAILBOX_COUNTER_TTL);
382+
}
383+
}
388384
}
389385

390386
allocateConnection(data, callback) {

lib/lua/cachedcounter.lua

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,17 @@ local increment = tonumber(ARGV[1]) or 0;
33
local ttl = tonumber(ARGV[2]) or 0;
44

55
if redis.call("EXISTS", key) == 1 then
6+
67
redis.call("INCRBY", key, increment);
78
local sum = tonumber(redis.call("GET", key)) or 0;
9+
if sum < 0 then
10+
redis.call("DEL", key);
11+
return nil;
12+
end
13+
814
-- extend the life of this counter by ttl seconds
915
redis.call("EXPIRE", key, ttl);
16+
1017
return sum;
1118
else
1219
return nil;

lib/tasks/clear-folder.js

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,9 @@
22

33
const log = require('npmlog');
44
const db = require('../db');
5-
const util = require('util');
65

76
let run = async (task, data, options) => {
87
const messageHandler = options.messageHandler;
9-
const deleteMessage = util.promisify(messageHandler.del.bind(messageHandler));
108

119
const { user, mailbox, skipArchive } = data;
1210

@@ -27,7 +25,7 @@ let run = async (task, data, options) => {
2725
}
2826

2927
try {
30-
await deleteMessage({
28+
await messageHandler.delAsync({
3129
user,
3230
mailbox: { user, mailbox },
3331
messageData,

lib/tools.js

Lines changed: 17 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -168,15 +168,14 @@ function flatAddresses(addresses) {
168168
return list;
169169
}
170170

171-
function getMailboxCounter(db, mailbox, type, done) {
172-
let prefix = type ? type : 'total';
173-
db.redis.get(prefix + ':' + mailbox.toString(), (err, sum) => {
174-
if (err) {
175-
return done(err);
176-
}
171+
async function getMailboxCounter(db, mailbox, type) {
172+
const cacheKey = `${type || 'total'}:${mailbox.toString()}`;
177173

178-
if (sum !== null) {
179-
return done(null, Number(sum));
174+
try {
175+
// Check cache for pre-calculated counter value
176+
let sum = await db.redis.get(cacheKey);
177+
if (sum !== null && !isNaN(sum) && Number(sum) >= 0) {
178+
return Number(sum);
180179
}
181180

182181
// calculate sum
@@ -185,24 +184,17 @@ function getMailboxCounter(db, mailbox, type, done) {
185184
query[type] = true;
186185
}
187186

188-
db.database.collection('messages').countDocuments(query, (err, sum) => {
189-
if (err) {
190-
return done(err);
191-
}
187+
sum = await db.database.collection('messages').countDocuments(query);
192188

193-
// cache calculated sum in redis
194-
db.redis
195-
.multi()
196-
.set(prefix + ':' + mailbox.toString(), sum)
197-
.expire(prefix + ':' + mailbox.toString(), consts.MAILBOX_COUNTER_TTL)
198-
.exec(err => {
199-
if (err) {
200-
errors.notify(err);
201-
}
202-
done(null, sum);
203-
});
204-
});
205-
});
189+
// Cache calculated sum in redis
190+
191+
await db.redis.multi().set(cacheKey, sum).expire(cacheKey, consts.MAILBOX_COUNTER_TTL).exec();
192+
193+
return sum;
194+
} catch (err) {
195+
errors.notify(err);
196+
throw err;
197+
}
206198
}
207199

208200
function renderEmailTemplate(tags, template) {

0 commit comments

Comments
 (0)