Skip to content

Commit 84e2942

Browse files
Ravjot BrarRavjot Brar
authored andcommitted
Address review comments
1 parent b613418 commit 84e2942

File tree

9 files changed

+154
-50
lines changed

9 files changed

+154
-50
lines changed

apps/metrics/src/index.js

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -145,9 +145,10 @@ async function main() {
145145
const port = Number(cfg.server.port || 0)
146146
const backendServerHost = process.env.SERVER_HOST ?? "localhost"
147147
const backendServerPort = process.env.SERVER_PORT ?? "8080"
148+
const metricsServerHost = process.env.METRICS_HOST ?? "127.0.0.1"
148149
const server = app.listen(port, async () => {
149150
const assignedPort = server.address().port
150-
console.debug(`listening on http://0.0.0.0:${assignedPort}`)
151+
console.debug(`listening on http://${metricsServerHost}:${assignedPort}`)
151152
try {
152153
const registerURI = `http://${backendServerHost}:${backendServerPort}/orchestrator/register`
153154
console.debug("Sending Register request to ", registerURI)
@@ -156,7 +157,7 @@ async function main() {
156157
method: "POST",
157158
headers: { "Content-Type": "application/json" },
158159
body: JSON.stringify({
159-
metricsServerUri: `http://0.0.0.0:${assignedPort}`,
160+
metricsServerUri: `http://${metricsServerHost}:${assignedPort}`,
160161
pid: process.pid,
161162
nodeId: ownConnectionId,
162163
}),

apps/server/src/__tests__/connection.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ describe("connectToValkey", () => {
3939
metricsServerMap.set(DEFAULT_PAYLOAD.connectionId, {
4040
metricsURI: "http://localhost:1234",
4141
pid: 12345,
42-
lastSeen: Date.now().toString(),
42+
lastSeen: Date.now(),
4343
})
4444
})
4545

apps/server/src/__tests__/metrics-orchestrator.test.ts

Lines changed: 53 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11

22
import { describe, it, beforeEach, afterEach, mock } from "node:test"
33
import assert from "node:assert"
4+
import { GlideClient } from "@valkey/valkey-glide"
45
import {
56
metricsServerMap,
67
stopAllMetricsServers,
78
reconcileClusterMetricsServers,
9+
clients,
810
__test__
911
} from "../metrics-orchestrator"
1012
import type { ClusterNodeMap, MetricsServerMap } from "../metrics-orchestrator"
@@ -22,23 +24,22 @@ const clusterNodesRegistry = {
2224
}
2325

2426
describe("metrics-orchestrator", () => {
25-
beforeEach(() => {
26-
metricsServerMap.clear()
27-
})
28-
29-
afterEach(() => {
30-
mock.restoreAll()
31-
metricsServerMap.clear()
32-
})
33-
3427
describe("findDiff", () => {
28+
let client: GlideClient
29+
beforeEach(() => {
30+
client = {} as GlideClient
31+
})
32+
afterEach(() => {
33+
mock.restoreAll()
34+
metricsServerMap.clear()
35+
})
3536
it("should return nodes to add if not in metricsMap", async () => {
3637
const clusterNodes: ClusterNodeMap = {
3738
node1: { host: "127.0.0.1", port: "6379", tls: false, verifyTlsCertificate: false },
3839
node2: { host: "127.0.0.2", port: "6379", tls: false, verifyTlsCertificate: false },
3940
}
4041
const metricsMap: MetricsServerMap = new Map([
41-
["node1", { metricsURI: "uri", pid: 123, lastSeen: Date.now().toString() }],
42+
["node1", { metricsURI: "uri", pid: 123, lastSeen: Date.now() }],
4243
])
4344
const { nodesToAdd, nodesToRemove } = await __test__.findDiff(metricsMap, clusterNodes)
4445
assert.strictEqual(Object.keys(nodesToAdd).length, 1)
@@ -52,8 +53,8 @@ describe("metrics-orchestrator", () => {
5253
}
5354
const now = Date.now()
5455
const metricsMap: MetricsServerMap = new Map([
55-
["node1", { metricsURI: "uri", pid: 123, lastSeen: now.toString() }],
56-
["node2", { metricsURI: "uri", pid: 456, lastSeen: now.toString() }],
56+
["node1", { metricsURI: "uri", pid: 123, lastSeen: now }],
57+
["node2", { metricsURI: "uri", pid: 456, lastSeen: now }],
5758
])
5859
const { nodesToAdd, nodesToRemove } = await __test__.findDiff(metricsMap, clusterNodes)
5960
assert.strictEqual(Object.keys(nodesToAdd).length, 0)
@@ -65,17 +66,41 @@ describe("metrics-orchestrator", () => {
6566
const clusterNodes: ClusterNodeMap = {
6667
node1: { host: "127.0.0.1", port: "6379", tls: false, verifyTlsCertificate: false },
6768
}
68-
const pastTime = (Date.now() - 100000).toString()
69+
const pastTime = (Date.now() - 100000)
6970
const metricsMap: MetricsServerMap = new Map([
7071
["node1", { metricsURI: "uri", pid: 123, lastSeen: pastTime }],
7172
])
7273
const { nodesToAdd, nodesToRemove } = await __test__.findDiff(metricsMap, clusterNodes)
7374
assert.strictEqual(nodesToAdd.node1, undefined)
7475
assert.strictEqual(nodesToRemove.includes("node1"), true)
7576
})
77+
it("should NOT remove nodes that exist in clients even if not in clusterMap", async () => {
78+
const now = Date.now()
79+
80+
const metricsMap: MetricsServerMap = new Map([
81+
["node1", { metricsURI: "uri", pid: 123, lastSeen: now }],
82+
])
83+
84+
const clusterNodes: ClusterNodeMap = {
85+
// node1 intentionally missing
86+
}
87+
88+
// simulate active client for node1
89+
clients.set("node1", { client })
90+
91+
const { nodesToAdd, nodesToRemove } = await __test__.findDiff(metricsMap, clusterNodes)
92+
93+
// should NOT be removed because it's still in clients
94+
assert.strictEqual(nodesToRemove.includes("node1"), false)
95+
assert.strictEqual(Object.keys(nodesToAdd).length, 0)
96+
})
7697
})
7798

7899
describe("startMetricsServer / stopMetricsServer", () => {
100+
afterEach(() => {
101+
mock.restoreAll()
102+
metricsServerMap.clear()
103+
})
79104
it("should spawn a new metrics server", async () => {
80105
const nodes = {
81106
host: "127.0.0.1",
@@ -90,7 +115,7 @@ describe("metrics-orchestrator", () => {
90115
async (nodesMap: Record<string, ConnectionDetails>) => {
91116
// simulate inserting all nodes into metricsServerMap
92117
for (const [key, node] of Object.entries(nodesMap)) {
93-
metricsServerMap.set(key, { metricsURI: node.host, pid: 999, lastSeen: "123" })
118+
metricsServerMap.set(key, { metricsURI: node.host, pid: 999, lastSeen: 123 })
94119
}
95120
},
96121
)
@@ -104,7 +129,7 @@ describe("metrics-orchestrator", () => {
104129

105130
it("should stop a metrics server by killing pid", async () => {
106131
let killedPid: number | undefined
107-
metricsServerMap.set("node1", { metricsURI: "uri", pid: 1234, lastSeen: Date.now().toString() })
132+
metricsServerMap.set("node1", { metricsURI: "uri", pid: 1234, lastSeen: Date.now() })
108133
mock.method(process, "kill", (pid: number) => {
109134
killedPid = pid
110135
})
@@ -115,8 +140,8 @@ describe("metrics-orchestrator", () => {
115140
})
116141
it("should kill all metrics servers and clear the map safely", async () => {
117142
const killed: number[] = []
118-
metricsServerMap.set("node1", { metricsURI: "uri", pid: 1, lastSeen: "1" })
119-
metricsServerMap.set("node2", { metricsURI: "uri", pid: 2, lastSeen: "2" })
143+
metricsServerMap.set("node1", { metricsURI: "uri", pid: 1, lastSeen: 1 })
144+
metricsServerMap.set("node2", { metricsURI: "uri", pid: 2, lastSeen: 2 })
120145
mock.method(process, "kill", (pid: number) => killed.push(pid))
121146

122147
await stopAllMetricsServers(metricsServerMap)
@@ -128,10 +153,12 @@ describe("metrics-orchestrator", () => {
128153

129154
describe("reconcileClusterMetricsServers", () => {
130155
let connectionDetails: ConnectionDetails
156+
let client: GlideClient
131157

132158
beforeEach(() => {
133159
metricsServerMap.clear()
134160
connectionDetails = { host: "127.0.0.1", port: "6379", tls: false, verifyTlsCertificate: false }
161+
client = {} as GlideClient
135162

136163
// Mock all side-effectful internal functions
137164
mock.method(__test__, "connectToInitialValkeyNode", async () => ({}))
@@ -145,17 +172,22 @@ describe("metrics-orchestrator", () => {
145172
mock.method(__test__, "updateMetricsServers", async () => {})
146173
mock.method(__test__, "findDiff", async () => ({ nodesToAdd: {}, nodesToRemove: [] }))
147174
})
175+
afterEach(() => {
176+
mock.restoreAll()
177+
})
148178

149179
it("should discover cluster if registry is empty", async () => {
150-
await reconcileClusterMetricsServers(clusterNodesRegistry, metricsServerMap, connectionDetails)
180+
await reconcileClusterMetricsServers(
181+
clusterNodesRegistry,metricsServerMap, connectionDetails, client)
151182
assert.ok(clusterNodesRegistry["cluster-1"])
152183
})
153184

154185
it("should call updateClusterNodeRegistry for existing clusters", async () => {
155186
clusterNodesRegistry["cluster-1"] = {
156187
node1: { host: "127.0.0.1", port: 6379, tls: false, verifyTlsCertificate: false },
157188
}
158-
await reconcileClusterMetricsServers(clusterNodesRegistry, metricsServerMap, connectionDetails)
189+
await reconcileClusterMetricsServers(
190+
clusterNodesRegistry,metricsServerMap, connectionDetails, client)
159191
// Nothing should throw; mocks handle all calls
160192
})
161193

@@ -165,7 +197,8 @@ describe("metrics-orchestrator", () => {
165197
clusterNodesRegistry["cluster-1"] = {
166198
node1: { host: "127.0.0.1", port: 6379, tls: false, verifyTlsCertificate: false },
167199
}
168-
await reconcileClusterMetricsServers(clusterNodesRegistry, metricsServerMap, connectionDetails)
200+
await reconcileClusterMetricsServers(
201+
clusterNodesRegistry,metricsServerMap, connectionDetails, client)
169202
// updateMetricsServers should not be called because nothing changed
170203
})
171204
})

apps/server/src/index.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ import {
2929
clusterNodesRegistry,
3030
initialConnectionDetails,
3131
cleanupOrchestratorResources,
32-
clients
32+
clients,
33+
getInitialClient
3334
} from "./metrics-orchestrator"
3435
import type { Request, Response } from "express"
3536

@@ -74,9 +75,10 @@ const wss = new WebSocketServer({ server })
7475
const delay = (ms: number) => new Promise((res) => setTimeout(res, ms))
7576

7677
async function runReconcileLoop() {
78+
const initialClient = await getInitialClient()
7779
while (true) {
7880
try {
79-
await reconcileClusterMetricsServers(clusterNodesRegistry, metricsServerMap, initialConnectionDetails)
81+
await reconcileClusterMetricsServers(clusterNodesRegistry, metricsServerMap, initialConnectionDetails, initialClient)
8082
await delay(5000)
8183
} catch (err) {
8284
console.error("Failed to reconcile metrics servers", err)
@@ -90,7 +92,7 @@ server.listen(port, () => {
9092
if (process.send) { // Check if process.send is available (i.e., if forked)
9193
process.send({ type: "websocket-ready" }) // Send a ready message to the parent process
9294
}
93-
if (process.env.USE_ORCHESTRATOR === "true") {
95+
if (process.env.USE_CLUSTER_ORCHESTRATOR === "true") {
9496
runReconcileLoop()
9597
}
9698
})

0 commit comments

Comments
 (0)