Skip to content

Commit d4d2dcb

Browse files
authored
Add worker support (#693)
1 parent ffe193b commit d4d2dcb

12 files changed

Lines changed: 696 additions & 272 deletions

File tree

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,12 @@ project adheres to [Semantic Versioning](http://semver.org/).
2929
- fix: Browser compatibility for Gauge.startTimer()
3030
- ci: Run benchmarks for pull requests
3131
- ci: switch out deprecated benchmark-regression library for replacement
32+
- AggregatorRegistry renamed to ClusterRegistry, old name deprecated
3233

3334
### Added
3435

3536
- Expanded benchmarking code
37+
- new WorkerRegistry to provide equivalent support to AggregatorRegistry
3638

3739
## [15.1.3] - 2024-06-27
3840

example/cluster.js

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,30 +2,32 @@
22

33
const cluster = require('cluster');
44
const express = require('express');
5+
const { ClusterRegistry } = require('../');
6+
57
const metricsServer = express();
6-
const AggregatorRegistry = require('../').AggregatorRegistry;
7-
const aggregatorRegistry = new AggregatorRegistry();
8+
const clusterRegistry = new ClusterRegistry();
89

910
if (cluster.isPrimary) {
10-
for (let i = 0; i < 4; i++) {
11-
cluster.fork();
11+
for (let i = 1; i <= 4; i++) {
12+
cluster.fork({ ...process.env, PORT: 3000 + i });
1213
}
1314

1415
metricsServer.get('/cluster_metrics', async (req, res) => {
1516
try {
16-
const metrics = await aggregatorRegistry.clusterMetrics();
17-
res.set('Content-Type', aggregatorRegistry.contentType);
17+
const metrics = await clusterRegistry.clusterMetrics();
18+
res.set('Content-Type', clusterRegistry.contentType);
1819
res.send(metrics);
1920
} catch (ex) {
2021
res.statusCode = 500;
2122
res.send(ex.message);
2223
}
2324
});
2425

25-
metricsServer.listen(3001);
26-
console.log(
27-
'Cluster metrics server listening to 3001, metrics exposed on /cluster_metrics',
28-
);
26+
metricsServer.listen(3000, () => {
27+
console.log(
28+
'Cluster metrics server listening to 3000, metrics exposed on /cluster_metrics',
29+
);
30+
});
2931
} else {
3032
require('./server.js');
3133
}

example/server.js

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
const express = require('express');
44
const cluster = require('cluster');
5+
const { isMainThread, threadId } = require('node:worker_threads');
56
const server = express();
67
const register = require('../').register;
78

@@ -73,6 +74,11 @@ if (cluster.isWorker) {
7374
setInterval(() => {
7475
c.inc({ code: `worker_${cluster.worker.id}` });
7576
}, 2000);
77+
} else if (!isMainThread) {
78+
// Expose some worker-specific metric as an example
79+
setInterval(() => {
80+
c.inc({ code: `worker_${threadId}` });
81+
}, 2000);
7682
}
7783

7884
const t = [];
@@ -108,7 +114,8 @@ server.get('/metrics/counter', async (req, res) => {
108114
});
109115

110116
const port = process.env.PORT || 3000;
111-
console.log(
112-
`Server listening to ${port}, metrics exposed on /metrics endpoint`,
113-
);
114-
server.listen(port);
117+
server.listen(port, () => {
118+
console.log(
119+
`Server listening to ${port}, metrics exposed on /metrics endpoint`,
120+
);
121+
});

example/worker.js

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
'use strict';
2+
3+
const Path = require('path');
4+
const { Worker, isMainThread, workerData } = require('node:worker_threads');
5+
const express = require('express');
6+
const WorkerRegistry = require('../').WorkerRegistry;
7+
8+
const collector = workerData?.['prom-client']?.collector === true;
9+
const metricsServer = express();
10+
const workerRegistry = new WorkerRegistry(
11+
WorkerRegistry.PROMETHEUS_CONTENT_TYPE,
12+
collector,
13+
);
14+
15+
if (isMainThread) {
16+
// By default the main thread is the collector. Demonstrating off-loading.
17+
new Worker(Path.join(__filename), {
18+
env: { ...process.env, PORT: 3333 },
19+
workerData: {
20+
'prom-client': { collector: true },
21+
},
22+
});
23+
24+
for (let i = 1; i <= 10; i++) {
25+
const opts = { env: { ...process.env, PORT: 3000 + i } };
26+
new Worker(Path.join(__filename), opts);
27+
}
28+
}
29+
30+
if (collector) {
31+
metricsServer.get('/cluster_metrics', async (req, res) => {
32+
try {
33+
const metrics = await workerRegistry.workerMetrics();
34+
res.set('Content-Type', workerRegistry.contentType);
35+
res.send(metrics);
36+
} catch (ex) {
37+
res.statusCode = 500;
38+
res.send(ex.message);
39+
}
40+
});
41+
42+
metricsServer.listen(process.env.PORT, () => {
43+
console.log(
44+
`Cluster metrics server listening to ${process.env.PORT}, metrics exposed on /cluster_metrics`,
45+
);
46+
});
47+
} else {
48+
require('./server.js');
49+
}

index.d.ts

Lines changed: 67 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,19 @@ export class Registry<
9898
*/
9999
static merge(registers: Registry[]): Registry;
100100

101+
/**
102+
* Creates a new Registry instance from an array of metrics that were
103+
* created by `registry.getMetricsAsJSON()`. Metrics are aggregated using
104+
* the method specified by their `aggregator` property, or by summation if
105+
* `aggregator` is undefined.
106+
* @param {Array} metricsArr Array of metrics, each of which created by
107+
* `registry.getMetricsAsJSON()`.
108+
* @returns {Registry} aggregated registry.
109+
*/
110+
static aggregate<T extends RegistryContentType>(
111+
metricsArr: Array<object>,
112+
): Registry<T>; // TODO Promise?
113+
101114
/**
102115
* HTTP Prometheus Content-Type for metrics response headers.
103116
*/
@@ -131,7 +144,7 @@ export const prometheusContentType: PrometheusContentType;
131144
*/
132145
export const openMetricsContentType: OpenMetricsContentType;
133146

134-
export class AggregatorRegistry<
147+
export class ClusterRegistry<
135148
T extends RegistryContentType,
136149
> extends Registry<T> {
137150
/**
@@ -142,17 +155,60 @@ export class AggregatorRegistry<
142155
clusterMetrics(): Promise<string>;
143156

144157
/**
145-
* Creates a new Registry instance from an array of metrics that were
146-
* created by `registry.getMetricsAsJSON()`. Metrics are aggregated using
147-
* the method specified by their `aggregator` property, or by summation if
148-
* `aggregator` is undefined.
149-
* @param {Array} metricsArr Array of metrics, each of which created by
150-
* `registry.getMetricsAsJSON()`.
151-
* @returns {Registry} aggregated registry.
158+
* Sets the registry or registries to be aggregated. Call from workers to
159+
* use a registry/registries other than the default global registry.
160+
* @param {Array<Registry>|Registry} regs Registry or registries to be
161+
* aggregated.
162+
* @returns {void}
152163
*/
153-
static aggregate<T extends RegistryContentType>(
154-
metricsArr: Array<object>,
155-
): Registry<T>; // TODO Promise?
164+
static setRegistries(
165+
regs:
166+
| Array<
167+
Registry<PrometheusContentType> | Registry<OpenMetricsContentType>
168+
>
169+
| Registry<PrometheusContentType>
170+
| Registry<OpenMetricsContentType>,
171+
): void;
172+
}
173+
174+
export class WorkerRegistry<T extends RegistryContentType> extends Registry<T> {
175+
/**
176+
* Gets aggregated metrics for all workers.
177+
* @returns {Promise<string>} Promise that resolves with the aggregated
178+
* metrics.
179+
*/
180+
workerMetrics(): Promise<string>;
181+
182+
addWorker(worker: Worker): void;
183+
/**
184+
* Sets the registry or registries to be aggregated. Call from workers to
185+
* use a registry/registries other than the default global registry.
186+
* @param {Array<Registry>|Registry} regs Registry or registries to be
187+
* aggregated.
188+
* @returns {void}
189+
*/
190+
static setRegistries(
191+
regs:
192+
| Array<
193+
Registry<PrometheusContentType> | Registry<OpenMetricsContentType>
194+
>
195+
| Registry<PrometheusContentType>
196+
| Registry<OpenMetricsContentType>,
197+
): void;
198+
}
199+
200+
/**
201+
* @deprecated
202+
*/
203+
export class AggregatorRegistry<
204+
T extends RegistryContentType,
205+
> extends Registry<T> {
206+
/**
207+
* Gets aggregated metrics for all workers.
208+
* @returns {Promise<string>} Promise that resolves with the aggregated
209+
* metrics.
210+
*/
211+
clusterMetrics(): Promise<string>;
156212

157213
/**
158214
* Sets the registry or registries to be aggregated. Call from workers to

index.js

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,4 +34,8 @@ exports.exponentialBuckets =
3434
exports.collectDefaultMetrics = require('./lib/defaultMetrics');
3535

3636
exports.aggregators = require('./lib/metricAggregators').aggregators;
37-
exports.AggregatorRegistry = require('./lib/cluster');
37+
exports.ClusterRegistry = require('./lib/cluster');
38+
exports.WorkerRegistry = require('./lib/worker');
39+
/** @deprecated */
40+
exports.AggregatorRegistry = exports.ClusterRegistry;
41+
exports[Symbol('util')] = require('./lib/util');

lib/cluster.js

Lines changed: 3 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@
99
*/
1010

1111
const Registry = require('./registry');
12-
const { Grouper } = require('./util');
13-
const { aggregators } = require('./metricAggregators');
1412
// We need to lazy-load the 'cluster' module as some application servers -
1513
// namely Passenger - crash when it is imported.
1614
let cluster = () => {
@@ -109,39 +107,7 @@ class AggregatorRegistry extends Registry {
109107
metricsArr,
110108
registryType = Registry.PROMETHEUS_CONTENT_TYPE,
111109
) {
112-
const aggregatedRegistry = new Registry();
113-
const metricsByName = new Grouper();
114-
115-
aggregatedRegistry.setContentType(registryType);
116-
117-
// Gather by name
118-
metricsArr.forEach(metrics => {
119-
metrics.forEach(metric => {
120-
metricsByName.add(metric.name, metric);
121-
});
122-
});
123-
124-
// Aggregate gathered metrics.
125-
metricsByName.forEach(metrics => {
126-
const aggregatorName = metrics[0].aggregator;
127-
const aggregatorFn = aggregators[aggregatorName];
128-
if (typeof aggregatorFn !== 'function') {
129-
throw new Error(`'${aggregatorName}' is not a defined aggregator.`);
130-
}
131-
const aggregatedMetric = aggregatorFn(metrics);
132-
// NB: The 'omit' aggregator returns undefined.
133-
if (aggregatedMetric !== undefined) {
134-
const aggregatedMetricWrapper = Object.assign(
135-
{
136-
get: () => aggregatedMetric,
137-
},
138-
aggregatedMetric,
139-
);
140-
aggregatedRegistry.registerMetric(aggregatedMetricWrapper);
141-
}
142-
});
143-
144-
return aggregatedRegistry;
110+
return Registry.aggregate(metricsArr, registryType);
145111
}
146112

147113
/**
@@ -193,15 +159,13 @@ function addListeners() {
193159
// finalize
194160
clearTimeout(request.errorTimeout);
195161

196-
const registry = AggregatorRegistry.aggregate(request.responses);
162+
const registry = Registry.aggregate(request.responses);
197163
const promString = registry.metrics();
198164
request.done(undefined, promString);
199165
}
200166
}
201167
});
202-
}
203-
204-
if (cluster().isWorker) {
168+
} else {
205169
// Respond to master's requests for worker's local metrics.
206170
process.on('message', message => {
207171
if (message.type === GET_METRICS_REQ) {

lib/registry.js

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
'use strict';
22

3-
const { getValueAsString } = require('./util');
3+
const { getValueAsString, Grouper } = require('./util');
4+
const { aggregators } = require('./metricAggregators');
45

56
class Registry {
67
static get PROMETHEUS_CONTENT_TYPE() {
@@ -216,6 +217,54 @@ class Registry {
216217
metricsToMerge.forEach(mergedRegistry.registerMetric, mergedRegistry);
217218
return mergedRegistry;
218219
}
220+
221+
/**
222+
* Creates a new Registry instance from an array of metrics that were
223+
* created by `registry.getMetricsAsJSON()`. Metrics are aggregated using
224+
* the method specified by their `aggregator` property, or by summation if
225+
* `aggregator` is undefined.
226+
* @param {Array} metricsArr Array of metrics, each of which created by
227+
* `registry.getMetricsAsJSON()`.
228+
* @param {string} registryType content type of the new registry. Defaults
229+
* to PROMETHEUS_CONTENT_TYPE.
230+
* @returns {Registry} aggregated registry.
231+
*/
232+
static aggregate(
233+
metricsArr,
234+
registryType = Registry.PROMETHEUS_CONTENT_TYPE,
235+
) {
236+
const aggregatedRegistry = new Registry();
237+
const metricsByName = new Grouper();
238+
239+
aggregatedRegistry.setContentType(registryType);
240+
241+
// Gather by name
242+
metricsArr.forEach(metrics => {
243+
metrics.forEach(metric => {
244+
metricsByName.add(metric.name, metric);
245+
});
246+
});
247+
248+
// Aggregate gathered metrics.
249+
metricsByName.forEach(metrics => {
250+
const aggregatorName = metrics[0].aggregator;
251+
const aggregatorFn = aggregators[aggregatorName];
252+
if (typeof aggregatorFn !== 'function') {
253+
throw new Error(`'${aggregatorName}' is not a defined aggregator.`);
254+
}
255+
const aggregatedMetric = aggregatorFn(metrics);
256+
// NB: The 'omit' aggregator returns undefined.
257+
if (aggregatedMetric !== undefined) {
258+
const aggregatedMetricWrapper = {
259+
get: () => aggregatedMetric,
260+
...aggregatedMetric,
261+
};
262+
aggregatedRegistry.registerMetric(aggregatedMetricWrapper);
263+
}
264+
});
265+
266+
return aggregatedRegistry;
267+
}
219268
}
220269

221270
function formatLabels(labels, exclude) {

0 commit comments

Comments
 (0)