diff --git a/tools/valkey-metrics/config.yml b/tools/valkey-metrics/config.yml index 517c6b85..c437e23d 100644 --- a/tools/valkey-metrics/config.yml +++ b/tools/valkey-metrics/config.yml @@ -36,4 +36,4 @@ epics: monitoringDuration: 60000 monitoringInterval: 60000 maxCommandsPerRun: 1000000 - monitor: "monitor" + file_prefix: "monitor" diff --git a/tools/valkey-metrics/src/analyzers/calculateHotKeys.js b/tools/valkey-metrics/src/analyzers/calculateHotKeys.js new file mode 100644 index 00000000..ada7d6be --- /dev/null +++ b/tools/valkey-metrics/src/analyzers/calculateHotKeys.js @@ -0,0 +1,24 @@ +import * as R from 'ramda' +import * as Streamer from '../effects/ndjson-streamer.js' + +export const calculateHotKeys = async () => { + const rows = await Streamer.monitor() + const ACCESS_COMMANDS = ["get", "set", "mget", "hget", "hgetall", "hmget", "json.get", "json.mget"] + const CUT_OFF_FREQUENCY = 1 + + return R.pipe( + R.reduce((acc, { ts, command }) => { + const [cmd, ...args] = command.split(' ').filter(Boolean) + if (ACCESS_COMMANDS.includes(cmd.trim().toLowerCase())) { + args.forEach(key => { + acc[key] = acc[key] ? acc[key] + 1 : 1 + }) + } + return acc + }, {}), + R.toPairs, + R.sort(R.descend(R.last)), + R.reject(([key, count]) => count <= CUT_OFF_FREQUENCY) + )(rows) +} + diff --git a/tools/valkey-metrics/src/effects/ndjson-reader.js b/tools/valkey-metrics/src/effects/ndjson-reader.js index bb04f30e..248c0104 100644 --- a/tools/valkey-metrics/src/effects/ndjson-reader.js +++ b/tools/valkey-metrics/src/effects/ndjson-reader.js @@ -1,15 +1,9 @@ import path from "node:path" import { readFile } from "node:fs/promises" +import { ymd } from "../utils/helpers" const DATA_DIR = process.env.METRICS_DIR || path.resolve(process.cwd(), "data") -const ymd = d => { - const y = d.getFullYear() - const m = String(d.getMonth() + 1).padStart(2, "0") - const day = String(d.getDate()).padStart(2, "0") - return `${y}${m}${day}` // "20250924" -} - const fileFor = (prefix, date) => path.join(DATA_DIR, `${prefix}_${ymd(date)}.ndjson`) // read file text or "" (ENOENT-safe) diff --git a/tools/valkey-metrics/src/effects/ndjson-streamer.js b/tools/valkey-metrics/src/effects/ndjson-streamer.js new file mode 100644 index 00000000..9daeae61 --- /dev/null +++ b/tools/valkey-metrics/src/effects/ndjson-streamer.js @@ -0,0 +1,40 @@ +import fs from "node:fs"; +import readline from "node:readline"; +import path from "node:path"; +import { ymd } from "../utils/helpers.js"; + +const DATA_DIR = process.env.METRICS_DIR || path.resolve(process.cwd(), "data"); + +const fileFor = (prefix, date) => path.join(DATA_DIR, `${prefix}_${ymd(date)}.ndjson`); + +export async function streamNdjson(prefix, filterFn = () => true) { + const today = new Date(); + const yesterday = new Date(today); + yesterday.setDate(today.getDate() - 1); + + const files = [fileFor(prefix, yesterday), fileFor(prefix, today)]; + const results = []; + + for (const file of files) { + if (!fs.existsSync(file)) continue; + + const fileStream = fs.createReadStream(file, { encoding: "utf8" }); + const rl = readline.createInterface({ input: fileStream, crlfDelay: Infinity }); + + for await (const line of rl) { + if (!line.trim()) continue; + try { + const obj = JSON.parse(line); + if (filterFn(obj)) results.push(obj) + + } catch { + // ignore bad lines + } + } + } + + return results; // JSON array of all objects +} + +export const [memory_stats, info_cpu, slowlog_len, slowlog_get, monitor] = + ['memory', 'cpu', 'slowlog_len', 'slowlog', 'monitor'].map(filePrefix => () => streamNdjson(filePrefix)) \ No newline at end of file diff --git a/tools/valkey-metrics/src/effects/ndjson-writer.js b/tools/valkey-metrics/src/effects/ndjson-writer.js index 20ff1328..6659170d 100644 --- a/tools/valkey-metrics/src/effects/ndjson-writer.js +++ b/tools/valkey-metrics/src/effects/ndjson-writer.js @@ -5,7 +5,7 @@ import * as R from "ramda" const dayStr = ts => new Date(ts).toISOString().slice(0, 10).replace(/-/g, "") export const makeNdjsonWriter = ({ dataDir, filePrefix }) => { - const fileFor = ts => path.join(dataDir, `${filePrefix}_${dayStr(ts)}.ndjson`) + const fileFor = ts => path.join(dataDir, `${filePrefix}_${dayStr(ts, filePrefix)}.ndjson`) const appendRows = async (rows = []) => { if (R.isEmpty(rows.length)) return diff --git a/tools/valkey-metrics/src/index.js b/tools/valkey-metrics/src/index.js index 21b76c0a..771b7d58 100644 --- a/tools/valkey-metrics/src/index.js +++ b/tools/valkey-metrics/src/index.js @@ -2,8 +2,9 @@ import fs from "fs" import express from "express" import { createClient } from "@valkey/client" import { loadConfig } from "./config.js" -import * as Reader from "./effects/ndjson-reader.js" +import * as Streamer from "./effects/ndjson-streamer.js" import { setupCollectors } from "./init-collectors.js" +import { calculateHotKeys } from "./analyzers/calculateHotKeys.js" const cfg = loadConfig() @@ -32,7 +33,7 @@ app.get("/health", (req, res) => res.json({ ok: true })) app.get('/memory', async (_req, res) => { try { - const rows = await Reader.memory_stats() + const rows = await Streamer.memory_stats() res.json({ rows }) } catch (e) { res.status(500).json({ error: e.message }) @@ -41,7 +42,7 @@ app.get('/memory', async (_req, res) => { app.get('/cpu', async (_req, res) => { try { - const rows = await Reader.info_cpu() + const rows = await Streamer.info_cpu() res.json({ rows }) } catch (e) { res.status(500).json({ error: e.message }) @@ -51,7 +52,7 @@ app.get('/cpu', async (_req, res) => { app.get('/slowlog', async (req, res) => { try { const count = Number(req.query.count) || 50 - const rows = await Reader.slowlog_get(count) + const rows = await Streamer.slowlog_get(count) res.json({ count: Math.max(1, Math.min(500, count)), rows }) } catch (e) { res.status(500).json({ error: e.message }) @@ -60,7 +61,7 @@ app.get('/slowlog', async (req, res) => { app.get('/slowlog_len', async (_req, res) => { try { - const rows = await Reader.slowlog_len() + const rows = await Streamer.slowlog_len() res.json({ rows }) } catch (e) { res.status(500).json({ error: e.message }) @@ -69,13 +70,22 @@ app.get('/slowlog_len', async (_req, res) => { app.get('/monitor', async (_req, res) => { try { - const rows = await Reader.monitor(); + const rows = await Streamer.monitor(); res.json({ rows }); } catch (e) { res.status(500).json({ error: e.message }); } }); +app.get('/hot-keys', async (_req, res) => { + try { + const hotkeys = await calculateHotKeys() + res.json(hotkeys) + } catch (e) { + res.status(500).json({error: e.message}) + } +}) + const port = Number(cfg.server.port || 3000) const server = app.listen(port, () => { console.log(`listening on http://0.0.0.0:${port}`) diff --git a/tools/valkey-metrics/src/init-collectors.js b/tools/valkey-metrics/src/init-collectors.js index 3944afe3..46a8d0cb 100644 --- a/tools/valkey-metrics/src/init-collectors.js +++ b/tools/valkey-metrics/src/init-collectors.js @@ -21,7 +21,7 @@ const setupCollectors = async client => { if(f.type === "monitor"){ sink = { appendRows: async rows => { - await nd.appendRows(rows, { newFile: true }) // new file per batch + await nd.appendRows(rows) console.info(`[${f.name}] wrote ${rows.length} logs to ${cfg.server.data_dir}/${f.file_prefix || f.name}`) }, close: nd.close diff --git a/tools/valkey-metrics/src/utils/helpers.js b/tools/valkey-metrics/src/utils/helpers.js new file mode 100644 index 00000000..eb4bbc43 --- /dev/null +++ b/tools/valkey-metrics/src/utils/helpers.js @@ -0,0 +1,6 @@ +export const ymd = d => { + const y = d.getFullYear() + const m = String(d.getMonth() + 1).padStart(2, "0") + const day = String(d.getDate()).padStart(2, "0") + return `${y}${m}${day}` // "20250924" +}