Skip to content

Commit efff6e0

Browse files
ravjotbrarRavjot Brar
andauthored
Fix issues with ElastiCache deployments (#249)
* Make server host and port configurable through env variables * Fix lint issues Signed-off-by: Ravjot Brar <Ravjot.Brar@VAN-FM2FQ6NHXT.local> * Remove window from metrics server Signed-off-by: Ravjot Brar <ravjot.brar@improving.com> * Replace host and port vars with ws_url * Get frontend to infer ws protocol * Changed name of ws env variable * Add alternatives to CONFIG commands * Make sure url doesn't rely on checking if browser * Change name of ws_url variable * Add cluster slot stats warning * Fix ws url generation * Comment out enableClusterSlotStats * MODULE also restricted * Add fix for idle timeout * Add interval time * Address review and remove unnecessary comments --------- Signed-off-by: Ravjot Brar <Ravjot.Brar@VAN-FM2FQ6NHXT.local> Signed-off-by: Ravjot Brar <ravjot.brar@improving.com> Co-authored-by: Ravjot Brar <Ravjot.Brar@VAN-FM2FQ6NHXT.local>
1 parent 7ce307c commit efff6e0

File tree

6 files changed

+58
-37
lines changed

6 files changed

+58
-37
lines changed

apps/frontend/src/state/epics/valkeyEpics.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -343,7 +343,7 @@ export const getHotKeysEpic = (store: Store) =>
343343
const state = store.getState()
344344
const connection = state.valkeyConnection.connections[connectionId]
345345
const monitorEnabled = state.config[connectionId].monitoring.monitorEnabled
346-
const lfuEnabled = connection.connectionDetails.keyEvictionPolicy.includes("lfu") ?? false
346+
const lfuEnabled = connection.connectionDetails.keyEvictionPolicy?.includes("lfu") ?? false
347347
const clusterSlotStatsEnabled = connection.connectionDetails.clusterSlotStatsEnabled ?? false
348348

349349
socket.next({

apps/frontend/src/state/epics/wsEpics.ts

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,14 @@ import {
2626

2727
let socket$: WebSocketSubject<PayloadAction> | null = null
2828

29-
const isBrowser = typeof window !== "undefined"
29+
const isElectron = window.location.protocol === "file:"
30+
const isHttps = window.location.protocol === "https:"
3031

3132
const url =
32-
process.env.WS_URL ||
33-
(isBrowser
34-
? `ws://${window.location.hostname}:${window.location.port || "8080"}`
35-
: "ws://localhost:8080")
33+
process.env.VALKEY_ADMIN_WS_URL ||
34+
(isElectron
35+
? "ws://localhost:8080"
36+
: `${isHttps ? "wss" : "ws"}://${window.location.host}`)
3637

3738
const connect = (store: Store) =>
3839
action$.pipe(

apps/server/src/__tests__/connection.test.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,10 @@ describe("connectToValkey", () => {
144144
) {
145145
return [{ key: "maxmemory-policy", value: "allkeys-lfu" }]
146146
}
147-
147+
else if (
148+
Array.isArray(args) &&
149+
args[0] === "JSON.TYPE"
150+
) throw Error
148151
// default response for other commands
149152
return []
150153
}),
@@ -301,9 +304,7 @@ describe("connectToValkey", () => {
301304

302305
it("should return false when JSON module is not present", async () => {
303306
const mockClient = {
304-
customCommand: mock.fn(async () => [
305-
[{ key: "name", value: "search" }],
306-
]),
307+
customCommand: mock.fn(async () => { throw Error }),
307308
}
308309

309310
const result = await checkJsonModuleAvailability(mockClient as any)
Lines changed: 5 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,14 @@
11
import { GlideClient, GlideClusterClient } from "@valkey/valkey-glide"
22

3-
type ModuleAttribute = {
4-
key: string
5-
value: unknown
6-
}
7-
8-
type Module = ModuleAttribute[]
9-
103
export async function checkJsonModuleAvailability(
114
client: GlideClient | GlideClusterClient,
125
): Promise<boolean> {
136
try {
14-
const modules = await client.customCommand(["MODULE", "LIST"]) as Module[]
15-
16-
return modules.some((module) => {
17-
const moduleName = module.find((attr) => attr.key === "name")?.value
18-
19-
return typeof moduleName === "string" &&
20-
(moduleName.toLowerCase().includes("json") ||
21-
moduleName.toLowerCase().includes("rejson"))
22-
})
23-
} catch (err) {
24-
console.error("Error checking JSON module availability:", err)
7+
// Elasticache restricts MODULE command
8+
await client.customCommand(["JSON.TYPE", "nonexistent_key"])
9+
return true
10+
} catch {
2511
return false
2612
}
2713
}
14+

apps/server/src/connection.ts

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,24 @@ export async function connectToValkey(
6161
// Only start metrics server if it hasn't been started before
6262
if (!metricsServerMap.has(payload.connectionId)) await startMetricsServer(payload.connectionDetails, payload.connectionId)
6363

64-
const evictionPolicyResponse = await standaloneClient.customCommand(["CONFIG", "GET", "maxmemory-policy"]) as [{key: string, value: string}]
65-
const keyEvictionPolicy: KeyEvictionPolicy = evictionPolicyResponse[0].value.toLowerCase() as KeyEvictionPolicy
64+
let keyEvictionPolicy: KeyEvictionPolicy = "noeviction"
65+
try {
66+
const evictionPolicyResponse = await standaloneClient.customCommand(
67+
["CONFIG", "GET", "maxmemory-policy"],
68+
) as [{key: string, value: string}]
69+
70+
keyEvictionPolicy = R.pipe(
71+
R.pathOr("noeviction", [0, "value"]),
72+
R.toLower,
73+
)(evictionPolicyResponse) as KeyEvictionPolicy
74+
} catch {
75+
console.warn("Command \"CONFIG\" not available. Trying \"INFO SERVER\" instead")
76+
const infoResponse = await standaloneClient.info([InfoOptions.Server])
77+
const parsed = parseInfo(infoResponse)
78+
if (parsed["maxmemory_policy"]) {
79+
keyEvictionPolicy = parsed["maxmemory_policy"].toLowerCase() as KeyEvictionPolicy
80+
}
81+
}
6682
const jsonModuleAvailable = await checkJsonModuleAvailability(standaloneClient)
6783

6884
if (await belongsToCluster(standaloneClient)) {
@@ -185,7 +201,6 @@ async function connectToCluster(
185201
jsonModuleAvailable: boolean,
186202
clusterNodesMap: Map<string, string[]>,
187203
) {
188-
await standaloneClient.customCommand(["CONFIG", "SET", "cluster-announce-hostname", addresses[0].host])
189204
const { clusterNodes, clusterId } = await discoverCluster(standaloneClient, payload)
190205
if (R.isEmpty(clusterNodes)) {
191206
throw new Error("No cluster nodes discovered")
@@ -241,10 +256,13 @@ async function connectToCluster(
241256
clusterNodesMap.set(clusterId, [payload.connectionId])
242257
}
243258

244-
const clusterSlotStatsResponse = await clusterClient.customCommand(
245-
["CONFIG", "GET", "cluster-slot-stats-enabled"],
246-
) as [Record<string, string>]
247-
const clusterSlotStatsEnabled = clusterSlotStatsResponse[0].value === "yes"
259+
let clusterSlotStatsEnabled = false
260+
try {
261+
await clusterClient.customCommand(["CLUSTER", "SLOT-STATS", "SLOTSRANGE", "0", "0"])
262+
clusterSlotStatsEnabled = true
263+
} catch {
264+
console.warn("Cluster slot-stats is not enabled.")
265+
}
248266

249267
const clusterConnectionInfo = {
250268
type: VALKEY.CONNECTION.clusterConnectFulfilled,

apps/server/src/index.ts

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@ import {
3434
} from "./metrics-orchestrator"
3535
import type { Request, Response } from "express"
3636

37+
interface AliveWebSocket extends WebSocket {
38+
isAlive: boolean
39+
}
40+
3741
interface MetricsServerMessage {
3842
type: string
3943
payload: {
@@ -102,8 +106,18 @@ server.listen(port, () => {
102106
}
103107
})
104108

105-
wss.on("connection", (ws: WebSocket) => {
109+
const interval = setInterval(() => {
110+
wss.clients.forEach((ws) => {
111+
const aliveSocket = ws as AliveWebSocket
112+
if (aliveSocket.isAlive === false) return ws.terminate()
113+
aliveSocket.isAlive = false
114+
ws.ping()
115+
})
116+
}, 30000)
117+
wss.on("connection", (ws: AliveWebSocket) => {
106118
console.log("Client connected.")
119+
ws.isAlive = true
120+
ws.on("pong", () => {ws.isAlive = true})
107121
// This is a simplified cluster node map that stores clusterIds and their corresponding nodeIds
108122
const clusterNodesMap: Map<string, string[]> = new Map()
109123

@@ -191,7 +205,7 @@ wss.on("connection", (ws: WebSocket) => {
191205

192206
function shutdown() {
193207
console.log("Shutdown signal received")
194-
208+
clearInterval(interval)
195209
// Close websocket clients
196210
wss.clients.forEach((ws) => {
197211
try {

0 commit comments

Comments
 (0)