diff --git a/apps/frontend/src/state/epics/rootEpic.ts b/apps/frontend/src/state/epics/rootEpic.ts index 4cf2dc84..9d617f49 100644 --- a/apps/frontend/src/state/epics/rootEpic.ts +++ b/apps/frontend/src/state/epics/rootEpic.ts @@ -1,7 +1,15 @@ import { merge } from "rxjs" import { wsConnectionEpic } from "./wsEpics" -import { connectionEpic, sendRequestEpic, setDataEpic, deleteConnectionEpic, autoReconnectEpic, valkeyRetryEpic, - updateConnectionDetailsEpic } from "./valkeyEpics" +import { + connectionEpic, + sendRequestEpic, + setDataEpic, + deleteConnectionEpic, + updateConnectionDetailsEpic, + autoReconnectEpic, + valkeyRetryEpic, + getHotKeysEpic +} from "./valkeyEpics" import { keyBrowserEpic } from "./keyBrowserEpic" import type { Store } from "@reduxjs/toolkit" @@ -15,6 +23,7 @@ export const registerEpics = (store: Store) => { updateConnectionDetailsEpic(store), sendRequestEpic(), setDataEpic(), + getHotKeysEpic(), keyBrowserEpic(), ).subscribe({ error: (err) => console.error("Epic error:", err), diff --git a/apps/frontend/src/state/epics/valkeyEpics.ts b/apps/frontend/src/state/epics/valkeyEpics.ts index 579ab38a..7f40fcfd 100644 --- a/apps/frontend/src/state/epics/valkeyEpics.ts +++ b/apps/frontend/src/state/epics/valkeyEpics.ts @@ -19,6 +19,7 @@ import { setData } from "../valkey-features/info/infoSlice" import { action$, select } from "../middleware/rxjsMiddleware/rxjsMiddlware" import { setClusterData } from "../valkey-features/cluster/clusterSlice" import { connectFulfilled as wsConnectFulfilled } from "../wsconnection/wsConnectionSlice" +import { hotKeysRequested } from "../valkey-features/hotkeys/hotKeysSlice.ts" import history from "../../history.ts" import type { Store } from "@reduxjs/toolkit" @@ -249,3 +250,12 @@ export const setDataEpic = () => history.navigate(dashboardPath) }), ) + +export const getHotKeysEpic = () => + action$.pipe( + select(hotKeysRequested), + tap((action) => { + const socket = getSocket() + socket.next(action) + }), + ) diff --git a/apps/frontend/src/state/valkey-features/hotkeys/hotKeysSlice.ts b/apps/frontend/src/state/valkey-features/hotkeys/hotKeysSlice.ts new file mode 100644 index 00000000..f493708e --- /dev/null +++ b/apps/frontend/src/state/valkey-features/hotkeys/hotKeysSlice.ts @@ -0,0 +1,60 @@ +import { createSlice } from "@reduxjs/toolkit" +import { type JSONObject } from "@common/src/json-utils" +import { VALKEY } from "@common/src/constants.ts" +import * as R from "ramda" +import type { RootState } from "@/store.ts" + +export const selectHotKeys = (id: string) => (state: RootState) => + R.path([VALKEY.HOTKEYS.name, id, "hotKeys"], state) + +interface HotKeysState { + [connectionId: string]: { + hotKeys: [string, number][] + checkAt: string|null, + monitorRunning: boolean, + nodeId: string|null, + error?: JSONObject | null, + } +} + +const initialHotKeysState: HotKeysState = {} + +const hotKeysSlice = createSlice({ + name: "hotKeys", + initialState: initialHotKeysState, + reducers: { + hotKeysRequested: (state, action) => { + const connectionId = action.payload.connectionId + if (!state[connectionId]) { + state[connectionId] = { + hotKeys: [], + checkAt: null, + monitorRunning: false, + nodeId: null, + } + } + }, + hotKeysFulfilled: (state, action) => { + const { hotKeys, monitorRunning, checkAt, nodeId } = action.payload.parsedResponse + console.log("Parsed response in the frontend is: ", action.payload.parsedResponse) + const connectionId = action.payload.connectionId + state[connectionId] = { + hotKeys, + checkAt, + monitorRunning, + nodeId, + } + + }, + hotKeysError: (state, action) => { + const { connectionId, error } = action.payload + state[connectionId].error = error + }, + }, +}) +export default hotKeysSlice.reducer +export const { + hotKeysRequested, + hotKeysFulfilled, + hotKeysError, +} = hotKeysSlice.actions diff --git a/apps/frontend/src/store.ts b/apps/frontend/src/store.ts index 99979ef9..f03fe9c9 100644 --- a/apps/frontend/src/store.ts +++ b/apps/frontend/src/store.ts @@ -8,6 +8,7 @@ import clusterReducer from "@/state/valkey-features/cluster/clusterSlice" import valkeyCommandReducer from "@/state/valkey-features/command/commandSlice.ts" import valkeyInfoReducer from "@/state/valkey-features/info/infoSlice.ts" import keyBrowserReducer from "@/state/valkey-features/keys/keyBrowserSlice.ts" +import hotKeysReducer from "@/state/valkey-features/hotkeys/hotKeysSlice.ts" export const store = configureStore({ reducer: { @@ -17,6 +18,7 @@ export const store = configureStore({ [VALKEY.STATS.name]: valkeyInfoReducer, [VALKEY.KEYS.name]: keyBrowserReducer, [VALKEY.CLUSTER.name]: clusterReducer, + [VALKEY.HOTKEYS.name]: hotKeysReducer, }, middleware: (getDefaultMiddleware) => { return getDefaultMiddleware({ diff --git a/apps/metrics/src/analyzers/calculateHotKeys.js b/apps/metrics/src/analyzers/calculateHotKeys.js index ada7d6be..24f6fd88 100644 --- a/apps/metrics/src/analyzers/calculateHotKeys.js +++ b/apps/metrics/src/analyzers/calculateHotKeys.js @@ -5,7 +5,6 @@ export const calculateHotKeys = async () => { const rows = await Streamer.monitor() const ACCESS_COMMANDS = ["get", "set", "mget", "hget", "hgetall", "hmget", "json.get", "json.mget"] const CUT_OFF_FREQUENCY = 1 - return R.pipe( R.reduce((acc, { ts, command }) => { const [cmd, ...args] = command.split(' ').filter(Boolean) diff --git a/apps/metrics/src/effects/ndjson-reader.js b/apps/metrics/src/effects/ndjson-reader.js deleted file mode 100644 index 248c0104..00000000 --- a/apps/metrics/src/effects/ndjson-reader.js +++ /dev/null @@ -1,40 +0,0 @@ -import path from "node:path" -import { readFile } from "node:fs/promises" -import { ymd } from "../utils/helpers" - -const DATA_DIR = process.env.METRICS_DIR || path.resolve(process.cwd(), "data") - -const fileFor = (prefix, date) => path.join(DATA_DIR, `${prefix}_${ymd(date)}.ndjson`) - -// read file text or "" (ENOENT-safe) -const readText = async file => { - try { - return await readFile(file, "utf8") - } catch (e) { - return e?.code === "ENOENT" ? "" : Promise.reject(e) - } -} - -// parse once after concatenation; ignore any bad/partial line at the tail -const parseNdjson = txt => - txt.split(/\r?\n/) - .filter(Boolean) // sometimes, "" == false is a feature, not a bug - .map(line => { try { return JSON.parse(line) } catch { return null } }) - .filter(Boolean) - -// today's rows, then yesterday's rows appended (not sorting anything as they are monotonically appended anyway) -const readTwoDaysRows = async prefix => { - const today = new Date() - const yesterday = new Date(today); yesterday.setDate(today.getDate() - 1) - - const [t, y] = await Promise.all([ - readText(fileFor(prefix, today)), - readText(fileFor(prefix, yesterday)) - ]) - - return parseNdjson([t, y].filter(Boolean).join('\n')) -} - -// Reader API to match express public API -export const [memory_stats, info_cpu, slowlog_len, slowlog_get, monitor] = - ['memory', 'cpu', 'slowlog_len', 'slowlog', 'monitor'].map(filePrefix => () => readTwoDaysRows(filePrefix)) \ No newline at end of file diff --git a/apps/metrics/src/effects/ndjson-streamer.js b/apps/metrics/src/effects/ndjson-streamer.js index 33ca77c9..9314bc29 100644 --- a/apps/metrics/src/effects/ndjson-streamer.js +++ b/apps/metrics/src/effects/ndjson-streamer.js @@ -1,13 +1,14 @@ import fs from "node:fs"; import readline from "node:readline"; import path from "node:path"; -import { loadConfig } from "../config.js"; + +const DATA_DIR = process.env.DATA_DIR || path.resolve(process.cwd(), "data") const dayStr = (date) => date.toISOString().slice(0, 10).replace(/-/g, ""); + const fileFor = (prefix, date) => { - const cfg = loadConfig(); - const dataDir = cfg.server.data_dir; + const dataDir = DATA_DIR return path.join(dataDir, `${prefix}_${dayStr(date)}.ndjson`); }; @@ -17,11 +18,11 @@ export async function streamNdjson(prefix, filterFn = () => true) { yesterday.setDate(today.getDate() - 1); const files = [fileFor(prefix, yesterday), fileFor(prefix, today)]; + const results = []; for (const file of files) { if (!fs.existsSync(file)) continue; - const fileStream = fs.createReadStream(file, { encoding: "utf8" }); const rl = readline.createInterface({ input: fileStream, crlfDelay: Infinity }); diff --git a/apps/metrics/src/index.js b/apps/metrics/src/index.js index 9916fce8..b1da5a76 100644 --- a/apps/metrics/src/index.js +++ b/apps/metrics/src/index.js @@ -9,7 +9,6 @@ import { MODE, ACTION, MONITOR } from "./utils/constants.js" async function main() { const cfg = loadConfig() - const ensureDir = dir => { if (!fs.existsSync(dir)) fs.mkdirSync(dir, { recursive: true }) } ensureDir(cfg.server.data_dir) @@ -26,7 +25,7 @@ async function main() { const client = createClient({ url }) client.on("error", err => console.error("valkey error", err)) await client.connect() - const stoppers = await setupCollectors(client) + const stoppers = await setupCollectors(client, cfg) const app = express() @@ -82,7 +81,7 @@ async function main() { if (monitorRunning) { return { monitorRunning } } - await startMonitor() + await startMonitor(cfg) monitorRunning = true checkAt = Date.now() + monitorDuration return { monitorRunning, checkAt} @@ -122,12 +121,12 @@ async function main() { return res.json(monitorResponse) } if (Date.now() > checkAt) { - const hotkeys = await calculateHotKeys() + const hotKeys = await calculateHotKeys() if (req.query.mode !== MODE.CONTINUOUS) { await monitorHandler(ACTION.STOP) } monitorResponse = await monitorHandler(ACTION.STATUS) - return res.json({ nodeId: url, hotkeys, ...monitorResponse }) + return res.json({ nodeId: url, hotKeys, ...monitorResponse }) } return res.json({ checkAt }) diff --git a/apps/metrics/src/init-collectors.js b/apps/metrics/src/init-collectors.js index 0935a26c..eeaef2d1 100644 --- a/apps/metrics/src/init-collectors.js +++ b/apps/metrics/src/init-collectors.js @@ -2,13 +2,11 @@ import { makeFetcher } from "./effects/fetchers.js" import { makeMonitorStream } from "./effects/monitor-stream.js" import { makeNdjsonWriter } from "./effects/ndjson-writer.js" import { startCollector } from "./epics/collector-rx.js" -import { loadConfig } from "./config.js" -const cfg = loadConfig() const MONITOR = "monitor" const stoppers = {} -const startMonitor = () => { +const startMonitor = (cfg) => { const nd = makeNdjsonWriter({ dataDir: cfg.server.data_dir, filePrefix: MONITOR @@ -17,7 +15,7 @@ const startMonitor = () => { const sink = { appendRows: async rows => { await nd.appendRows(rows) - console.info(`[${monitorEpic.name}] wrote ${rows.length} logs to ${cfg.server.data_dir}/${monitorEpic.file_prefix || monitorEpic.name}`) + console.info(`[${monitorEpic.name}] wrote ${rows.length} logs to ${cfg.server.data_dir}/`) }, close: nd.close } @@ -38,7 +36,7 @@ const startMonitor = () => { const stopMonitor = async () => await stoppers[MONITOR]() -const setupCollectors = async client => { +const setupCollectors = async (client, cfg) => { const fetcher = makeFetcher(client) const stoppers = {} await Promise.all(cfg.epics diff --git a/apps/metrics/src/utils/constants.js b/apps/metrics/src/utils/constants.js index c4c890ef..3db4a983 100644 --- a/apps/metrics/src/utils/constants.js +++ b/apps/metrics/src/utils/constants.js @@ -8,4 +8,4 @@ export const ACTION = { START: "start", STOP: "stop", STATUS: "status", -} +} \ No newline at end of file diff --git a/apps/server/src/actions/hotkeys.ts b/apps/server/src/actions/hotkeys.ts new file mode 100644 index 00000000..eb2588a4 --- /dev/null +++ b/apps/server/src/actions/hotkeys.ts @@ -0,0 +1,72 @@ +import { type WebSocket } from "ws" +import { VALKEY } from "../../../../common/src/constants" +import { withDeps, Deps } from "./utils" + +type HotKeysResponse = { + nodeId: string + hotkeys: [[]] + checkAt: number + monitorRunning: boolean +} + +const sendHotKeysFulfilled = ( + ws: WebSocket, + connectionId: string, + parsedResponse: HotKeysResponse, +) => { + ws.send( + JSON.stringify({ + type: VALKEY.HOTKEYS.hotKeysFulfilled, + payload: { + connectionId, + parsedResponse, + }, + }), + ) +} + +const sendHotKeysError = ( + ws: WebSocket, + connectionId: string, + error: unknown, +) => { + console.log(error) + ws.send( + JSON.stringify({ + type: VALKEY.HOTKEYS.hotKeysError, + payload: { + connectionId, + error: error instanceof Error ? error.message : String(error), + }, + }), + ) +} + +export const hotKeysRequested = withDeps( + async ({ ws, connectionId, metricsServerURIs }) => { + const metricsServerURI = metricsServerURIs.get(connectionId) + try { + const initialResponse = await fetch(`${metricsServerURI}/hot-keys`) + const initialParsedResponse: HotKeysResponse = await initialResponse.json() as HotKeysResponse + // Initial request starts monitoring and returns when to fetch results (`checkAt`). + if (initialParsedResponse.checkAt) { + const delay = Math.max(initialParsedResponse.checkAt - Date.now(), 0) + // Schedule the follow-up request for when the monitor cycle finishes + setTimeout(async () => { + try { + const dataResponse = await fetch(`${metricsServerURI}/hot-keys`) + const dataParsedResponse = await dataResponse.json() as HotKeysResponse + sendHotKeysFulfilled(ws, connectionId, dataParsedResponse) + } catch (error) { + sendHotKeysError(ws, connectionId, error) + } + }, delay) + } + else { + sendHotKeysFulfilled(ws, connectionId, initialParsedResponse) + } + } catch (error) { + sendHotKeysError(ws, connectionId, error) + } + }, +) diff --git a/apps/server/src/actions/utils.ts b/apps/server/src/actions/utils.ts index 20ee559c..70fd4c79 100644 --- a/apps/server/src/actions/utils.ts +++ b/apps/server/src/actions/utils.ts @@ -4,7 +4,8 @@ import type WebSocket from "ws" export type Deps = { ws: WebSocket clients: Map - connectionId: string + connectionId: string, + metricsServerURIs: Map, } export type ReduxAction = { diff --git a/apps/server/src/calculateHotKeys.ts b/apps/server/src/calculateHotKeys.ts deleted file mode 100644 index e69de29b..00000000 diff --git a/apps/server/src/index.ts b/apps/server/src/index.ts index cb97e3c8..488231b4 100644 --- a/apps/server/src/index.ts +++ b/apps/server/src/index.ts @@ -12,12 +12,22 @@ import { getKeyTypeRequested, updateKeyRequested } from "./actions/keys.ts" +import { hotKeysRequested } from "./actions/hotkeys.ts" import { Handler, ReduxAction, unknownHandler, type WsActionMessage } from "./actions/utils.ts" +interface MetricsServerMessage { + type: string + payload: { + metricsHost: string + metricsPort: number + serverConnectionId: string + } +} + const wss = new WebSocketServer({ port: 8080 }) const metricsServerURIs: Map = new Map() -process.on("message", (message) => { +process.on("message", (message: MetricsServerMessage ) => { if (message?.type === "metrics-started") { const metricsServerURI = `${message.payload.metricsHost}:${message.payload.metricsPort}` metricsServerURIs.set(message.payload.serverConnectionId, metricsServerURI) @@ -65,10 +75,11 @@ wss.on("connection", (ws: WebSocket) => { [VALKEY.KEYS.deleteKeyRequested]: deleteKeyRequested, [VALKEY.KEYS.addKeyRequested]: addKeyRequested, [VALKEY.KEYS.updateKeyRequested]: updateKeyRequested, + [VALKEY.HOTKEYS.hotKeysRequested]: hotKeysRequested, } const handler = handlers[action!.type] ?? unknownHandler - await handler({ ws, clients, connectionId: connectionId! })(action as ReduxAction) + await handler({ ws, clients, connectionId: connectionId!, metricsServerURIs })(action as ReduxAction) }) ws.on("error", (err) => { console.error("WebSocket error:", err) diff --git a/common/src/constants.ts b/common/src/constants.ts index 75c8316c..3163ae9d 100644 --- a/common/src/constants.ts +++ b/common/src/constants.ts @@ -61,6 +61,11 @@ export const VALKEY = { deleteCluster: "deleteCluster", setClusterData: "setClusterData", } as const), + HOTKEYS: makeNamespace( "hotKeys",{ + hotKeysRequested: "hotKeysRequested", + hotKeysFulfilled: "hotKeysFulfilled", + hotKeysError: "hotKeysError", + }), } as const export const CONNECTED = "Connected" diff --git a/install-mac-build.sh b/install-mac-build.sh new file mode 100755 index 00000000..64f440d3 --- /dev/null +++ b/install-mac-build.sh @@ -0,0 +1,38 @@ +#!/bin/bash + +echo "Closing existing Skyscope app if running…" +osascript -e 'tell application "Skyscope" to quit' || true + +set -e + +echo "Packaging mac build…" +npm run package:mac + +DMG_PATH=$(ls release/*.dmg | head -n 1) + +if [ -z "$DMG_PATH" ]; then + echo "No DMG found in release/" + exit 1 +fi + +MOUNT_POINT=$(hdiutil attach "$DMG_PATH" | grep Volumes | awk '{for(i=3;i<=NF;i++) printf "%s%s",$i,(i