Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,4 @@ out/
app/
packages/

tools/valkey-metrics/data/
apps/metrics/data/
78 changes: 74 additions & 4 deletions apps/frontend/electron.main.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
const { app, BrowserWindow, Menu } = require('electron');
const { app, BrowserWindow } = require('electron');
const path = require('path');
const { fork } = require('child_process');

let serverProcess;
let metricsProcesses = new Map();

function startServer() {
if (app.isPackaged) {
Expand All @@ -19,6 +20,61 @@ function startServer() {
}
}

function startMetrics(serverConnectionId, serverConnectionDetails) {
const dataDir = path.join(app.getPath('userData'), 'metrics-data', serverConnectionId);

let metricsServerPath;
let configPath;

if (app.isPackaged) {
metricsServerPath = path.join(process.resourcesPath, 'server-metrics.js');
configPath = path.join(process.resourcesPath, 'config.yml'); // Path for production
} else {
metricsServerPath = path.join(__dirname, '../../metrics/src/index.js');
configPath = path.join(__dirname, '../../metrics/config.yml'); // Path for development
}

console.log(`Starting metrics server for connection ${serverConnectionId}...`);

const metricsProcess = fork(metricsServerPath, [], {
env: {
...process.env,
PORT: 0,
DATA_DIR: dataDir,
VALKEY_URL: `valkey://${serverConnectionDetails.host}:${serverConnectionDetails.port}`,
CONFIG_PATH: configPath // Explicitly provide the config path
}
});

metricsProcesses.set(serverConnectionId, metricsProcess);

metricsProcess.on('message', (message) => {
if (message && message.type === 'metrics-started') {
console.log(`Metrics server for ${serverConnectionId} started successfully on host: ${message.payload.metricsHost} port ${message.payload.metricsPort}`);
}
});

metricsProcess.on('close', (code) => {
console.log(`Metrics server for connection ${serverConnectionId} exited with code ${code}`);
metricsProcesses.delete(serverConnectionId);
});

metricsProcess.on('error', (err) => {
console.error(`Metrics server for connection ${serverConnectionId} error: ${err}`);
});
}

// Disconnect functionality in the server has not been implemented. Once that is implemented, this can be used.
function stopMetricServer(serverConnectionId) {
metricsProcesses.get(serverConnectionId).kill();
}

function stopMetricServers() {
metricsProcesses.forEach((_serverConnectionId, metricProcess) => {
metricProcess.kill();
})
}

function createWindow() {
const win = new BrowserWindow({
width: 1200,
Expand All @@ -31,7 +87,6 @@ function createWindow() {

if (app.isPackaged) {
win.loadFile(path.join(__dirname, 'dist', 'index.html'));
//win.webContents.openDevTools(); // open DevTools for debugging
} else {
win.loadURL('http://localhost:5173');
win.webContents.openDevTools();
Expand All @@ -42,8 +97,20 @@ app.whenReady().then(() => {
startServer();
if (serverProcess) {
serverProcess.on('message', (message) => {
if (message === 'ready') {
createWindow();
switch (message.type) {
case 'websocket-ready':
createWindow();
break;
case 'valkeyConnection/standaloneConnectFulfilled':
startMetrics(message.payload.connectionId, message.payload.connectionDetails);
break;
default:
try {
console.log(`Received unknown server message: ${JSON.stringify(message)}`);
} catch (_) {
console.log(`Received unknown server message: ${message}`);
}

}
});
} else {
Expand All @@ -61,6 +128,9 @@ app.on('before-quit', () => {
if (serverProcess) {
serverProcess.kill();
}
if (metricsProcesses.length > 0) {
stopMetricServers();
}
});

app.on('activate', () => {
Expand Down
2 changes: 1 addition & 1 deletion apps/frontend/src/state/epics/rootEpic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import type { Store } from "@reduxjs/toolkit"
export const registerEpics = (store: Store) => {
merge(
wsConnectionEpic(store),
connectionEpic(store),
connectionEpic(),
autoReconnectEpic(store),
valkeyRetryEpic(store),
deleteConnectionEpic(),
Expand Down
47 changes: 31 additions & 16 deletions apps/frontend/src/state/epics/valkeyEpics.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,27 @@
import { merge, timer, EMPTY } from "rxjs"
import { ignoreElements, tap, delay, switchMap, catchError } from "rxjs/operators"
import { ignoreElements, tap, delay, switchMap, catchError, filter } from "rxjs/operators"
import * as R from "ramda"
import { DISCONNECTED, LOCAL_STORAGE, NOT_CONNECTED, RETRY_CONFIG, retryDelay } from "@common/src/constants.ts"
import { toast } from "sonner"
import { getSocket } from "./wsEpics"
import { connectFulfilled, connectPending, deleteConnection, connectRejected, startRetry, stopRetry }
from "../valkey-features/connection/connectionSlice"
import {
standaloneConnectFulfilled,
clusterConnectFulfilled,
connectPending,
deleteConnection,
connectRejected,
startRetry,
stopRetry
} from "../valkey-features/connection/connectionSlice"
import { sendRequested } from "../valkey-features/command/commandSlice"
import { setData } from "../valkey-features/info/infoSlice"
import { action$, select } from "../middleware/rxjsMiddleware/rxjsMiddlware"
import { setClusterData } from "../valkey-features/cluster/clusterSlice"
import { connectFulfilled as wsConnectFulfilled } from "../wsconnection/wsConnectionSlice"
import history from "../../history.ts"
import type { Store } from "@reduxjs/toolkit"
import { atId } from "@/state/valkey-features/connection/connectionSelectors.ts"

export const connectionEpic = (store: Store) =>
export const connectionEpic = () =>
merge(
action$.pipe(
select(connectPending),
Expand All @@ -28,21 +34,24 @@ export const connectionEpic = (store: Store) =>
),

action$.pipe(
select(connectFulfilled),
tap(({ payload: { connectionId } }) => {
select(standaloneConnectFulfilled),
tap(({ payload }) => {
try {
const currentConnections = R.pipe(
(v: string) => localStorage.getItem(v),
(s) => (s === null ? {} : JSON.parse(s)),
)(LOCAL_STORAGE.VALKEY_CONNECTIONS)

R.pipe( // merge fulfilled connection with existing connections in localStorage
atId(connectionId),
R.evolve({ status: R.always(NOT_CONNECTED) }),
R.assoc(connectionId, R.__, currentConnections),
R.pipe(
(p) => ({
connectionDetails: p.connectionDetails,
status: NOT_CONNECTED,
}),
(connectionToSave) => ({ ...currentConnections, [payload.connectionId]: connectionToSave }),
JSON.stringify,
(updated) => localStorage.setItem(LOCAL_STORAGE.VALKEY_CONNECTIONS, updated),
)(store.getState())
)(payload)

toast.success("Connected to server successfully!")
} catch (e) {
toast.error("Connection to server failed!")
Expand Down Expand Up @@ -166,9 +175,8 @@ export const deleteConnectionEpic = () =>
R.dissoc(connectionId),
JSON.stringify,
(updated) => localStorage.setItem(LOCAL_STORAGE.VALKEY_CONNECTIONS, updated),
() => toast.success("Connection removed successfully!"),
)(currentConnections)

toast.success("Connection removed successfully!")
} catch (e) {
toast.error("Failed to remove connection!")
console.error(e)
Expand All @@ -187,13 +195,20 @@ export const sendRequestEpic = () =>

export const setDataEpic = () =>
action$.pipe(
select(connectFulfilled),
filter(
({ type }) =>
type === standaloneConnectFulfilled.type ||
type === clusterConnectFulfilled.type,
),
tap((action) => {
const socket = getSocket()

const { clusterId, connectionId } = action.payload
if (clusterId) {

if (action.type === clusterConnectFulfilled.type) {
socket.next({ type: setClusterData.type, payload: { clusterId, connectionId } })
}

socket.next({ type: setData.type, payload: { connectionId } })
const dashboardPath = clusterId
? `/${clusterId}/${connectionId}/dashboard`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,28 @@ const connectionSlice = createSlice({
}),
}
},
connectFulfilled: (state, action) => {
standaloneConnectFulfilled: (
state,
action: PayloadAction<{
connectionId: string;
connectionDetails: { host: string; port: number};
}>,
) => {
const { connectionId } = action.payload
const connectionState = state.connections[connectionId]
if (connectionState) {
connectionState.status = CONNECTED
connectionState.errorMessage = null
}
},
clusterConnectFulfilled: (
state,
action: PayloadAction<{
connectionId: string;
clusterNodes: Record<string, ConnectionDetails>;
clusterId: string;
}>,
) => {
const { connectionId, clusterNodes, clusterId } = action.payload
const connectionState = state.connections[connectionId]
if (connectionState) {
Expand Down Expand Up @@ -140,10 +161,11 @@ const connectionSlice = createSlice({
})

export default connectionSlice.reducer
export const {
connectPending,
connectFulfilled,
connectRejected,
export const {
connectPending,
standaloneConnectFulfilled,
clusterConnectFulfilled,
connectRejected,
connectionBroken,
closeConnection,
updateConnectionDetails,
Expand Down
File renamed without changes.
File renamed without changes.
14 changes: 14 additions & 0 deletions apps/metrics/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Timestream Metrics

## Overview
Standalone service to poll info, memory and other data from Valkey cluster and store in files to produce timeseries of CPU load changes and other metrics trends over time.

## Getting started
**Note:** valkey-cluster must be running before the following steps.

`docker compose up --build`

To examine the metrics with curl:
`curl -s http://localhost:3000/slowlog | jq`
`curl -s http://localhost:3000/cpu | jq`
`curl -s http://localhost:3000/memory | jq`
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ valkey:
# url: "valkey://:password@my-cluster.example.com:6379"

server:
port: 3000
port: 0
data_dir: "/app/data"

collector:
Expand Down
Loading
Loading