-
Notifications
You must be signed in to change notification settings - Fork 228
Expand file tree
/
Copy pathqueueHelpers.js
More file actions
116 lines (99 loc) · 2.51 KB
/
queueHelpers.js
File metadata and controls
116 lines (99 loc) · 2.51 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
const _ = require('lodash');
/**
* Formats the number into "human readable" number/
*
* @param {Number} num The number to format.
* @returns {string} The number as a string or error text if we couldn't
* format it.
*/
function formatBytes(num) {
if (!Number.isFinite(num)) {
return 'Could not retrieve value';
}
const UNITS = ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB'];
const neg = num < 0;
if (neg) num = -num;
if (num < 1) {
return (neg ? '-' : '') + num + ' B';
}
const exponent = Math.min(
Math.floor(Math.log(num) / Math.log(1024)),
UNITS.length - 1
);
const numStr = Number((num / Math.pow(1024, exponent)).toPrecision(3));
const unit = UNITS[exponent];
return (neg ? '-' : '') + numStr + ' ' + unit;
}
function splitInfo(res) {
if (typeof res !== 'string') {
return {};
}
const serverInfo = {};
const lines = res.split('\r\n');
for (let i = 0; i < lines.length; ++i) {
if (lines[i]) {
const line = lines[i].trim();
if (!line.startsWith('#')) {
const idx = line.indexOf(':');
if (idx > 0) {
serverInfo[line.substring(0, idx)] = line.substring(idx + 1);
}
}
}
}
return serverInfo;
}
const Helpers = {
getStats: async function (queue) {
const client = await queue.client;
const info = await client.info(); // In node-redis this will update queue.client.serverInfo
// In ioredis we need to parse this information:
const stats = _.pickBy(client.serverInfo || splitInfo(info), (value, key) =>
_.includes(this._usefulMetrics, key)
);
stats.used_memory = formatBytes(parseInt(stats.used_memory, 10));
stats.total_system_memory = formatBytes(
parseInt(stats.total_system_memory, 10)
);
return stats;
},
isPaused: async function (queue) {
return queue.isPaused();
},
_usefulMetrics: [
'redis_version',
'total_system_memory',
'used_memory',
'mem_fragmentation_ratio',
'connected_clients',
'blocked_clients',
],
/**
* Valid states for a job in bee queue
*/
BEE_STATES: ['waiting', 'active', 'succeeded', 'failed', 'delayed'],
/**
* Valid states for a job in bull queue
*/
BULL_STATES: [
'waiting',
'active',
'completed',
'failed',
'delayed',
'paused',
],
/**
* Valid states for a job in bullmq queue
*/
BULLMQ_STATES: [
'waiting',
'active',
'completed',
'failed',
'delayed',
'paused',
'waiting-children',
],
};
module.exports = Helpers;