Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 30 additions & 6 deletions tools/valkey-metrics/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import express from "express"
import { createClient } from "@valkey/client"
import { loadConfig } from "./config.js"
import * as Streamer from "./effects/ndjson-streamer.js"
import { setupCollectors } from "./init-collectors.js"
import { setupCollectors, startMonitor, stopMonitor } from "./init-collectors.js"
import { calculateHotKeys } from "./analyzers/calculateHotKeys.js"

const cfg = loadConfig()
Expand Down Expand Up @@ -68,14 +68,38 @@ app.get('/slowlog_len', async (_req, res) => {
}
})

app.get('/monitor', async (_req, res) => {
let monitorRunning = false
app.get('/monitor', async (req, res) => {
const action = req.query.action
try {
const rows = await Streamer.monitor();
res.json({ rows });
switch (action) {
case 'start':
if (monitorRunning) {
return res.json({ status: 'Monitor already running.' })
}
await startMonitor()
monitorRunning = true
return res.json({ status: 'Monitor started' })

case 'stop':
if (!monitorRunning) {
return res.json({ status: 'Monitor is already stopped.' })
}
await stopMonitor()
monitorRunning = false
return res.json({ status: 'Monitor stopped.' })

case 'status':
return res.json({ running: monitorRunning })

default:
return res.status(400).json({ error: 'Invalid action. Use ?action=start|stop|status' })
}
} catch (e) {
res.status(500).json({ error: e.message });
console.error(`[monitor] ${action} error:`, e)
return res.status(500).json({ error: e.message })
}
});
})

app.get('/hot-keys', async (_req, res) => {
try {
Expand Down
101 changes: 46 additions & 55 deletions tools/valkey-metrics/src/init-collectors.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,64 +5,56 @@ import { startCollector } from "./epics/collector-rx.js"
import { loadConfig } from "./config.js"

const cfg = loadConfig()
const MONITOR = "monitor"
const stoppers = {}

const startMonitor = () => {
const nd = makeNdjsonWriter({
dataDir: cfg.server.data_dir,
filePrefix: MONITOR
})
const monitorEpic = cfg.epics.find(e => e.name === MONITOR)
const sink = {
appendRows: async rows => {
await nd.appendRows(rows)
console.info(`[${monitorEpic.name}] wrote ${rows.length} logs to ${cfg.server.data_dir}/${monitorEpic.file_prefix || monitorEpic.name}`)
},
close: nd.close
}
const stream$ = makeMonitorStream(async logs => {
await sink.appendRows(logs)
}, monitorEpic)
const subscription = stream$.subscribe({
next: logs => console.info(`[${monitorEpic.name}] monitor cycle complete (${logs.length} logs)`),
error: err => console.error(`[${monitorEpic.name}] monitor error:`, err),
complete: () => console.info(`[${monitorEpic.name}] monitor completed`),
})
stoppers[monitorEpic.name] = async () => {
console.info(`[${monitorEpic.name}] stopping monitor...`)
subscription.unsubscribe()
await sink.close()
}
}

const stopMonitor = async () => await stoppers[MONITOR]()

const setupCollectors = async client => {
const fetcher = makeFetcher(client)

const stoppers = {}

// here we start data collection epics per each config with corresponding stat fetchers
for (const f of cfg.epics) {
let fn, sink
const nd = makeNdjsonWriter({
dataDir: cfg.server.data_dir,
filePrefix: f.file_prefix || f.name
})
if(f.type === "monitor"){
sink = {
appendRows: async rows => {
await nd.appendRows(rows)
console.info(`[${f.name}] wrote ${rows.length} logs to ${cfg.server.data_dir}/${f.file_prefix || f.name}`)
},
close: nd.close
}
const stream$ = makeMonitorStream(async logs => {
await sink.appendRows(logs)
}, f)
const subscription = stream$.subscribe({
next: logs => console.info(`[${f.name}] monitor cycle complete (${logs.length} logs)`),
error: err => console.error(`[${f.name}] monitor error:`, err),
complete: () => console.info(`[${f.name}] monitor completed`),
await Promise.all(cfg.epics
.filter(f => f.name !== MONITOR && fetcher[f.type])
.map(async f => {
const fn = fetcher[f.type]
const nd = makeNdjsonWriter({
dataDir: cfg.server.data_dir,
filePrefix: f.file_prefix || f.name
})
stoppers[f.name] = async () => {
console.info(`[${f.name}] stopping monitor...`)
subscription.unsubscribe()
await sink.close()
}
}
else {
fn = fetcher[f.type]
if (!fn) {
console.warn(`unknown epic type ${f.type} for ${f.name}, skipping`)
continue
}
// write NDJSON files; if we need to ingest into memory for some reason — do it here
sink = {
appendRows: async rows => {
await nd.appendRows(rows)
},
const sink = {
appendRows: async rows => nd.appendRows(rows),
close: nd.close
}

// collect the first values immediately
try {
const rows = await fn()
await sink.appendRows(rows)
} catch (e) {
console.error(`[${f.name}] error`, e?.message || e)
}

// then start a corresponding epic to poll on `pollMs` interval
const rows = await fn()
await sink.appendRows(rows)
stoppers[f.name] = startCollector({
name: f.name,
pollMs: f.poll_ms,
Expand All @@ -71,10 +63,9 @@ const setupCollectors = async client => {
batchMs: cfg.collector.batch_ms,
batchMax: cfg.collector.batch_max
})
}
}

})
)
return stoppers
}

export { setupCollectors }
export { setupCollectors, startMonitor, stopMonitor }
Loading