Skip to content

Commit ad7de35

Browse files
committed
Change from global total storage limit to epic based storage and date limit
Signed-off-by: Argus Li <contactme@chunkeili.com>
1 parent f689b23 commit ad7de35

File tree

7 files changed

+84
-80
lines changed

7 files changed

+84
-80
lines changed

apps/metrics/config.yml

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,43 +9,54 @@ collector:
99
batch_ms: 60000
1010
batch_max: 500
1111

12-
storage:
13-
retention_days: 5
14-
1512
epics:
1613
- name: "memory"
1714
type: "memory_stats"
1815
poll_ms: 5000
1916
file_prefix: "memory"
17+
data_retention_mb: 15
18+
data_retention_days: 5
2019

2120
- name: "cpu"
2221
type: "info_cpu"
2322
poll_ms: 5000
2423
file_prefix: "cpu"
24+
data_retention_mb: 5
25+
data_retention_days: 5
2526

2627
- name: "commandlog_large_reply"
2728
type: "commandlog_large_reply"
2829
poll_ms: 10000
2930
file_prefix: "commandlog_large_reply"
31+
data_retention_mb: 5
32+
data_retention_days: 5
3033

3134
- name: "commandlog_large_request"
3235
type: "commandlog_large_request"
3336
poll_ms: 10000
3437
file_prefix: "commandlog_large_request"
38+
data_retention_mb: 5
39+
data_retention_days: 5
3540

3641
- name: "commandlog_slow"
3742
type: "commandlog_slow"
3843
poll_ms: 10000
3944
file_prefix: "commandlog_slow"
45+
data_retention_mb: 5
46+
data_retention_days: 5
4047

4148
- name: "slowlog_len"
4249
type: "slowlog_len"
4350
poll_ms: 60000
4451
file_prefix: "slowlog_len"
45-
52+
data_retention_mb: 3
53+
data_retention_days: 5
54+
4655
- name: "monitor"
4756
type: "monitor"
4857
monitoringDuration: 10000
4958
monitoringInterval: 10000
5059
maxCommandsPerRun: 1000000
5160
file_prefix: "monitor"
61+
data_retention_mb: 12
62+
data_retention_days: 5

apps/metrics/src/__tests__/test-helpers.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ export const createMockMonitor = () => ({
2626
export const createMockConfig = (overrides = {}) => ({
2727
valkey: { url: "valkey://localhost:6379" },
2828
server: { port: 3000, data_dir: "/test/data" },
29-
storage: { retention_days: 30, retention_size_mb: 50 },
3029
collector: { batch_ms: 1000, batch_max: 100 },
3130
epics: [],
3231
...overrides,

apps/metrics/src/config.js

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,13 @@ const cfgPath = process.env.CONFIG_PATH || path.join(__dirname, "..", "config.ym
1010

1111
let config = null
1212

13+
const EPIC_DEFAULTS = { data_retention_mb: 10, data_retention_days: 30 }
14+
1315
const DEFAULTS = {
1416
backend: { ping_interval: 10000 },
1517
valkey: {},
1618
server: { port: 3000, data_dir: "/app/data" },
1719
collector: { batch_ms: 60000, batch_max: 500 },
18-
storage: { retention_days: 30, retention_size_mb: 50 },
1920
epics: [],
2021
}
2122

@@ -26,19 +27,18 @@ const loadConfig = () => {
2627
const cfg = mergeDeepLeft(parsed, DEFAULTS)
2728

2829
// Type guards
29-
for (const key of ["backend", "valkey", "server", "collector", "storage"]) {
30+
for (const key of ["backend", "valkey", "server", "collector"]) {
3031
if (typeof cfg[key] !== "object" || Array.isArray(cfg[key])) {
3132
cfg[key] = DEFAULTS[key]
3233
}
3334
}
3435
if (!Array.isArray(cfg.epics)) cfg.epics = []
36+
cfg.epics = cfg.epics.map((e) => ({ ...EPIC_DEFAULTS, ...e }))
3537

3638
if (process.env.PORT) cfg.server.port = Number(process.env.PORT)
3739
if (process.env.DATA_DIR) cfg.server.data_dir = process.env.DATA_DIR
3840
if (process.env.BATCH_MS) cfg.collector.batch_ms = Number(process.env.BATCH_MS)
3941
if (process.env.BATCH_MAX) cfg.collector.batch_max = Number(process.env.BATCH_MAX)
40-
if (process.env.RETENTION_DAYS) cfg.storage.retention_days = Number(process.env.RETENTION_DAYS)
41-
if (process.env.RETENTION_SIZE) cfg.storage.retention_size_mb = Number(process.env.RETENTION_SIZE)
4242

4343
if (cfg.logging && typeof cfg.logging === "object") {
4444
if (!process.env.LOG_LEVEL && cfg.logging.level) process.env.LOG_LEVEL = String(cfg.logging.level)

apps/metrics/src/effects/ndjson-cleaner.js

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,16 @@ const log = createLogger("ndjson-cleaner")
99

1010
let cleanerStopper
1111

12+
const retentionForFile = (fileName, retentionByPrefix) => {
13+
for (const [prefix, days] of retentionByPrefix) {
14+
if (fileName.startsWith(`${prefix}_`)) return days
15+
}
16+
return -1
17+
}
18+
1219
export const setupNdjsonCleaner = ( cfg ) => {
20+
const retentionByPrefix = cfg.epics.map((e) => [e.file_prefix || e.name, e.data_retention_days])
21+
1322
const pipeline$ = timer(0, METRICS_EVICTION_POLICY.INTERVAL).pipe(
1423
exhaustMap(() =>
1524
fs.promises.readdir(cfg.server.data_dir) // get file names from directory
@@ -18,9 +27,10 @@ export const setupNdjsonCleaner = ( cfg ) => {
1827
.then((fileNames) => Promise.all(fileNames.map(
1928
async (fileName) => ({ stats: await fs.promises.stat(path.join(cfg.server.data_dir, fileName)), fileName }),
2029
)))
21-
// filter out files that are not expired
22-
.then((filesWithStats) => filesWithStats.reduce((acc, { stats, fileName }) => {
23-
if (stats.birthtime < Date.now() - cfg.storage.retention_days * MILLISECONDS_IN_A_DAY) acc.push(fileName)
30+
// filter out files that are expired based on their epic's retention
31+
.then((filesWithStats) => filesWithStats.reduce((acc, { stats, fileName }) => {
32+
const days = retentionForFile(fileName, retentionByPrefix)
33+
if (days >= 0 && stats.birthtime < Date.now() - days * MILLISECONDS_IN_A_DAY) acc.push(fileName)
2434
return acc
2535
}, []))
2636
// delete expired files

apps/metrics/src/effects/ndjson-cleaner.test.js

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,10 @@ describe("ndjson-cleaner", () => {
3333

3434
cfg = {
3535
server: { data_dir: "/app/data" },
36-
storage: { retention_days: 30 },
36+
epics: [
37+
{ name: "memory", file_prefix: "memory", data_retention_days: 30 },
38+
{ name: "cpu", file_prefix: "cpu", data_retention_days: 30 },
39+
],
3740
}
3841
})
3942

@@ -69,8 +72,8 @@ describe("ndjson-cleaner", () => {
6972

7073
it("keeps .ndjson files with recent birthtime", async () => {
7174
fs.promises.readdir.mockResolvedValue([
72-
"notes.ndjson",
73-
"backup.ndjson",
75+
"memory_20250614.ndjson",
76+
"cpu_20250614.ndjson",
7477
])
7578
fs.promises.stat.mockResolvedValue({ birthtime: new Date("2025-06-14") })
7679

@@ -101,11 +104,11 @@ describe("ndjson-cleaner", () => {
101104
// now = 2025-06-15T12:00:00.000Z, retention_days = 30 → cutoff = 2025-05-16T12:00:00.000Z
102105
// File with birthtime at cutoff should be kept, file before cutoff should be deleted
103106
fs.promises.readdir.mockResolvedValue([
104-
"at_cutoff.ndjson",
105-
"before_cutoff.ndjson",
107+
"memory_20250516.ndjson",
108+
"memory_20250516_1.ndjson",
106109
])
107110
fs.promises.stat.mockImplementation((filePath) => {
108-
if (filePath.endsWith("at_cutoff.ndjson")) {
111+
if (filePath.endsWith("memory_20250516.ndjson")) {
109112
return Promise.resolve({ birthtime: new Date("2025-05-16T12:00:00.000Z") })
110113
}
111114
return Promise.resolve({ birthtime: new Date("2025-05-16T11:59:59.999Z") })
@@ -116,9 +119,25 @@ describe("ndjson-cleaner", () => {
116119

117120
await vi.advanceTimersByTimeAsync(0)
118121

119-
// Only before_cutoff should be deleted (< cutoff), at_cutoff should be kept (== cutoff)
122+
// Only the file before cutoff should be deleted (< cutoff), the one at cutoff should be kept (== cutoff)
120123
expect(fs.promises.unlink).toHaveBeenCalledTimes(1)
121-
expect(fs.promises.unlink).toHaveBeenCalledWith("/app/data/before_cutoff.ndjson")
124+
expect(fs.promises.unlink).toHaveBeenCalledWith("/app/data/memory_20250516_1.ndjson")
125+
126+
stopNdjsonCleaner()
127+
})
128+
129+
it("skips files with no matching epic prefix", async () => {
130+
fs.promises.readdir.mockResolvedValue([
131+
"unknown_20250101.ndjson",
132+
])
133+
fs.promises.stat.mockResolvedValue({ birthtime: new Date("2025-01-01") })
134+
135+
const { setupNdjsonCleaner, stopNdjsonCleaner } = await import("./ndjson-cleaner.js")
136+
setupNdjsonCleaner(cfg)
137+
138+
await vi.advanceTimersByTimeAsync(0)
139+
140+
expect(fs.promises.unlink).not.toHaveBeenCalled()
122141

123142
stopNdjsonCleaner()
124143
})
@@ -227,8 +246,8 @@ describe("ndjson-cleaner", () => {
227246

228247
it("logs error when individual unlink fails", async () => {
229248
fs.promises.readdir.mockResolvedValue([
230-
"old_a.ndjson",
231-
"old_b.ndjson",
249+
"memory_20250101.ndjson",
250+
"cpu_20250101.ndjson",
232251
])
233252
fs.promises.stat.mockResolvedValue({ birthtime: new Date("2025-01-01") })
234253
fs.promises.unlink

apps/metrics/src/init-collectors.js

Lines changed: 6 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -21,50 +21,17 @@ const updateCollectorMeta = (name, patch) => {
2121
return next
2222
}
2323

24-
// Metric files capacity weights per metric
25-
const CAPACITY_WEIGHTS = {
26-
memory: 0.30,
27-
monitor: 0.25,
28-
commandlog_slow: 0.10,
29-
commandlog_large_reply: 0.10,
30-
commandlog_large_request: 0.10,
31-
cpu: 0.10,
32-
slowlog_len: 0.05,
33-
}
34-
3524
const MIN_FILE_SIZE = 256 * 1024 // 256 KB
3625
const MAX_FILE_SIZE = 10 * 1024 * 1024 // 10 MB
3726
const MIN_FILES = 4
3827

39-
const computeCapacity = (weight, totalMb) => {
40-
const capacityBytes = weight * totalMb * 1024 * 1024
28+
const computeCapacity = (retentionMb) => {
29+
const capacityBytes = retentionMb * 1024 * 1024
4130
const maxFileSize = Math.min(MAX_FILE_SIZE, Math.max(MIN_FILE_SIZE, Math.floor(capacityBytes / MIN_FILES)))
4231
const maxFiles = Math.max(MIN_FILES, Math.floor(capacityBytes / maxFileSize))
4332
return { maxFiles, maxFileSize }
4433
}
4534

46-
const warnIfStorageTooSmall = (retentionSizeMb, weights) => {
47-
const smallestWeight = Math.min(...Object.values(weights))
48-
const minTotalBytes = (MIN_FILE_SIZE * MIN_FILES) / smallestWeight
49-
const minTotalMb = Math.ceil(minTotalBytes / (1024 * 1024))
50-
if (retentionSizeMb < minTotalMb) {
51-
console.warn(
52-
`retention_size_mb ${retentionSizeMb} is below minimum ${minTotalMb} for active epics — ` +
53-
`per-metric capacity will be clamped to at least ${MIN_FILES} files × ${MIN_FILE_SIZE / 1024} KB`,
54-
)
55-
}
56-
}
57-
58-
const normalizeWeights = (epics) => {
59-
const activeEpicPrefixes = epics.map((e) => e.file_prefix || e.name)
60-
const rawTotal = activeEpicPrefixes.reduce((sum, epicPrefix) => sum + (CAPACITY_WEIGHTS[epicPrefix] ?? 0), 0)
61-
if (rawTotal === 0) {
62-
const equal = 1 / activeEpicPrefixes.length
63-
return Object.fromEntries(activeEpicPrefixes.map((p) => [p, equal]))
64-
}
65-
return Object.fromEntries(activeEpicPrefixes.map((p) => [p, (CAPACITY_WEIGHTS[p] ?? 0) / rawTotal]))
66-
}
67-
6835
// Use it in endpoints to return metadata to server then to UI
6936
// to show when the data was collected and will be refreshed
7037
export const getCollectorMeta = (name) => collectorsState[name]
@@ -75,18 +42,15 @@ updateCollectorMeta(MONITOR, {
7542
isRunning: false,
7643
})
7744
const startMonitor = (cfg) => {
78-
const weights = normalizeWeights(cfg.epics)
79-
warnIfStorageTooSmall(cfg.storage.retention_size_mb, weights)
80-
const { maxFiles, maxFileSize } = computeCapacity(weights[MONITOR], cfg.storage.retention_size_mb)
45+
const monitorEpic = cfg.epics.find((e) => e.name === MONITOR)
46+
const { maxFiles, maxFileSize } = computeCapacity(monitorEpic.data_retention_mb)
8147
const nd = makeNdjsonWriter({
8248
dataDir: cfg.server.data_dir,
83-
filePrefix: MONITOR,
49+
filePrefix: monitorEpic.file_prefix || MONITOR,
8450
maxFiles,
8551
maxFileSize,
8652
})
8753

88-
const monitorEpic = cfg.epics.find((e) => e.name === MONITOR)
89-
9054
const sink = {
9155
appendRows: async (rows) => {
9256
await nd.appendRows(rows)
@@ -145,15 +109,13 @@ const startMonitor = (cfg) => {
145109
const stopMonitor = async () => await monitorStopper()
146110

147111
const setupCollectors = async (client, cfg) => {
148-
const weights = normalizeWeights(cfg.epics)
149-
warnIfStorageTooSmall(cfg.storage.retention_size_mb, weights)
150112
const fetcher = makeFetcher(client)
151113
await Promise.all(cfg.epics
152114
.filter((f) => f.name !== MONITOR && fetcher[f.type])
153115
.map(async (f) => {
154116
const fn = fetcher[f.type]
155117
const prefix = f.file_prefix || f.name
156-
const { maxFiles, maxFileSize } = computeCapacity(weights[prefix], cfg.storage.retention_size_mb)
118+
const { maxFiles, maxFileSize } = computeCapacity(f.data_retention_mb)
157119
const nd = makeNdjsonWriter({
158120
dataDir: cfg.server.data_dir,
159121
filePrefix: prefix,

0 commit comments

Comments
 (0)