diff --git a/tools/valkey-metrics/src/index.js b/tools/valkey-metrics/src/index.js index 771b7d58..425b1035 100644 --- a/tools/valkey-metrics/src/index.js +++ b/tools/valkey-metrics/src/index.js @@ -3,7 +3,7 @@ import express from "express" import { createClient } from "@valkey/client" import { loadConfig } from "./config.js" import * as Streamer from "./effects/ndjson-streamer.js" -import { setupCollectors } from "./init-collectors.js" +import { setupCollectors, startMonitor, stopMonitor } from "./init-collectors.js" import { calculateHotKeys } from "./analyzers/calculateHotKeys.js" const cfg = loadConfig() @@ -68,14 +68,38 @@ app.get('/slowlog_len', async (_req, res) => { } }) -app.get('/monitor', async (_req, res) => { +let monitorRunning = false +app.get('/monitor', async (req, res) => { + const action = req.query.action try { - const rows = await Streamer.monitor(); - res.json({ rows }); + switch (action) { + case 'start': + if (monitorRunning) { + return res.json({ status: 'Monitor already running.' }) + } + await startMonitor() + monitorRunning = true + return res.json({ status: 'Monitor started' }) + + case 'stop': + if (!monitorRunning) { + return res.json({ status: 'Monitor is already stopped.' }) + } + await stopMonitor() + monitorRunning = false + return res.json({ status: 'Monitor stopped.' }) + + case 'status': + return res.json({ running: monitorRunning }) + + default: + return res.status(400).json({ error: 'Invalid action. Use ?action=start|stop|status' }) + } } catch (e) { - res.status(500).json({ error: e.message }); + console.error(`[monitor] ${action} error:`, e) + return res.status(500).json({ error: e.message }) } -}); +}) app.get('/hot-keys', async (_req, res) => { try { diff --git a/tools/valkey-metrics/src/init-collectors.js b/tools/valkey-metrics/src/init-collectors.js index 46a8d0cb..e689b836 100644 --- a/tools/valkey-metrics/src/init-collectors.js +++ b/tools/valkey-metrics/src/init-collectors.js @@ -5,64 +5,56 @@ import { startCollector } from "./epics/collector-rx.js" import { loadConfig } from "./config.js" const cfg = loadConfig() +const MONITOR = "monitor" +const stoppers = {} + +const startMonitor = () => { + const nd = makeNdjsonWriter({ + dataDir: cfg.server.data_dir, + filePrefix: MONITOR + }) + const monitorEpic = cfg.epics.find(e => e.name === MONITOR) + const sink = { + appendRows: async rows => { + await nd.appendRows(rows) + console.info(`[${monitorEpic.name}] wrote ${rows.length} logs to ${cfg.server.data_dir}/${monitorEpic.file_prefix || monitorEpic.name}`) + }, + close: nd.close + } + const stream$ = makeMonitorStream(async logs => { + await sink.appendRows(logs) + }, monitorEpic) + const subscription = stream$.subscribe({ + next: logs => console.info(`[${monitorEpic.name}] monitor cycle complete (${logs.length} logs)`), + error: err => console.error(`[${monitorEpic.name}] monitor error:`, err), + complete: () => console.info(`[${monitorEpic.name}] monitor completed`), + }) + stoppers[monitorEpic.name] = async () => { + console.info(`[${monitorEpic.name}] stopping monitor...`) + subscription.unsubscribe() + await sink.close() + } +} + +const stopMonitor = async () => await stoppers[MONITOR]() const setupCollectors = async client => { const fetcher = makeFetcher(client) - const stoppers = {} - - // here we start data collection epics per each config with corresponding stat fetchers - for (const f of cfg.epics) { - let fn, sink - const nd = makeNdjsonWriter({ - dataDir: cfg.server.data_dir, - filePrefix: f.file_prefix || f.name - }) - if(f.type === "monitor"){ - sink = { - appendRows: async rows => { - await nd.appendRows(rows) - console.info(`[${f.name}] wrote ${rows.length} logs to ${cfg.server.data_dir}/${f.file_prefix || f.name}`) - }, - close: nd.close - } - const stream$ = makeMonitorStream(async logs => { - await sink.appendRows(logs) - }, f) - const subscription = stream$.subscribe({ - next: logs => console.info(`[${f.name}] monitor cycle complete (${logs.length} logs)`), - error: err => console.error(`[${f.name}] monitor error:`, err), - complete: () => console.info(`[${f.name}] monitor completed`), + await Promise.all(cfg.epics + .filter(f => f.name !== MONITOR && fetcher[f.type]) + .map(async f => { + const fn = fetcher[f.type] + const nd = makeNdjsonWriter({ + dataDir: cfg.server.data_dir, + filePrefix: f.file_prefix || f.name }) - stoppers[f.name] = async () => { - console.info(`[${f.name}] stopping monitor...`) - subscription.unsubscribe() - await sink.close() - } - } - else { - fn = fetcher[f.type] - if (!fn) { - console.warn(`unknown epic type ${f.type} for ${f.name}, skipping`) - continue - } - // write NDJSON files; if we need to ingest into memory for some reason — do it here - sink = { - appendRows: async rows => { - await nd.appendRows(rows) - }, + const sink = { + appendRows: async rows => nd.appendRows(rows), close: nd.close } - - // collect the first values immediately - try { - const rows = await fn() - await sink.appendRows(rows) - } catch (e) { - console.error(`[${f.name}] error`, e?.message || e) - } - - // then start a corresponding epic to poll on `pollMs` interval + const rows = await fn() + await sink.appendRows(rows) stoppers[f.name] = startCollector({ name: f.name, pollMs: f.poll_ms, @@ -71,10 +63,9 @@ const setupCollectors = async client => { batchMs: cfg.collector.batch_ms, batchMax: cfg.collector.batch_max }) - } -} - + }) + ) return stoppers } -export { setupCollectors } \ No newline at end of file +export { setupCollectors, startMonitor, stopMonitor } \ No newline at end of file