From 430a975427dd9e93bd57287576550ec595929e03 Mon Sep 17 00:00:00 2001 From: Henrik Nygren Date: Mon, 17 Nov 2025 17:11:23 +0200 Subject: [PATCH] Remove WebSocket server implementation and related functionality. --- backend/app.ts | 3 - .../common/userCourseProgress/saveToDB.ts | 7 - .../bin/kafkaConsumer/common/userFunctions.ts | 6 - backend/services/redis.ts | 105 +++------ backend/wsServer.ts | 205 ------------------ 5 files changed, 29 insertions(+), 297 deletions(-) delete mode 100644 backend/wsServer.ts diff --git a/backend/app.ts b/backend/app.ts index 40c8586f3..4902730b7 100644 --- a/backend/app.ts +++ b/backend/app.ts @@ -6,7 +6,6 @@ import prisma from "./prisma" import server from "./server" import knex from "./services/knex" import { attachPrismaEvents } from "./util" -import { wsListen } from "./wsServer" require("sharp") // image library sharp seems to crash without this require @@ -40,8 +39,6 @@ const startApp = async () => { httpServer.listen(4000, () => { console.log("server running on port 4000") }) - - wsListen() } } diff --git a/backend/bin/kafkaConsumer/common/userCourseProgress/saveToDB.ts b/backend/bin/kafkaConsumer/common/userCourseProgress/saveToDB.ts index 7e100024b..76ffcfa26 100644 --- a/backend/bin/kafkaConsumer/common/userCourseProgress/saveToDB.ts +++ b/backend/bin/kafkaConsumer/common/userCourseProgress/saveToDB.ts @@ -9,7 +9,6 @@ import { import { DatabaseInputError, TMCError } from "../../../../lib/errors" import { err, ok, parseTimestamp, Result } from "../../../../util" -import { MessageType, pushMessageToClient } from "../../../../wsServer" import { getUserWithRaceCondition } from "../getUserWithRaceCondition" import { KafkaContext } from "../kafkaContext" import { generateUserCourseProgress } from "./generateUserCourseProgress" @@ -152,11 +151,5 @@ export const saveToDatabase = async ( context, }) - await pushMessageToClient( - message.user_id, - message.course_id, - MessageType.PROGRESS_UPDATED, - ) - return ok("Saved to DB successfully") } diff --git a/backend/bin/kafkaConsumer/common/userFunctions.ts b/backend/bin/kafkaConsumer/common/userFunctions.ts index c74b32990..3e7126759 100644 --- a/backend/bin/kafkaConsumer/common/userFunctions.ts +++ b/backend/bin/kafkaConsumer/common/userFunctions.ts @@ -17,7 +17,6 @@ import { ensureDefinedArray, isNullish, } from "../../../util" -import { MessageType, pushMessageToClient } from "../../../wsServer" import { ExerciseCompletionPart, ServiceProgressPartType, @@ -346,11 +345,6 @@ export const createCompletion = async ({ } // TODO: this only sends the completion email for the first tier completed - await pushMessageToClient( - user.upstream_id, - course.id, - MessageType.COURSE_CONFIRMED, - ) if (course.completion_email_id) { await prisma.emailDelivery.create({ diff --git a/backend/services/redis.ts b/backend/services/redis.ts index 53d664b1d..5be8a8877 100644 --- a/backend/services/redis.ts +++ b/backend/services/redis.ts @@ -1,5 +1,5 @@ import parseJSON from "json-parse-even-better-errors" -import { createClient, createSentinel } from "redis" +import { createClient } from "redis" import * as winston from "winston" import { @@ -7,8 +7,6 @@ import { NEXUS_REFLECTION, REDIS_DB, REDIS_PASSWORD, - REDIS_SENTINEL_MASTER_NAME, - REDIS_SENTINELS, REDIS_URL, } from "../config" import { BaseContext } from "../context" @@ -24,9 +22,7 @@ const _logger = winston.createLogger({ transports: [new winston.transports.Console()], }) -type RedisClient = - | ReturnType - | ReturnType +type RedisClient = ReturnType let redisClient: RedisClient | undefined let redisSubscriberClient: RedisClient | undefined @@ -53,73 +49,34 @@ const getRedisClient = (): RedisClient | undefined => { return } - const useSentinel = REDIS_SENTINELS && REDIS_SENTINEL_MASTER_NAME - - let client: RedisClient - - if (useSentinel && REDIS_SENTINELS && REDIS_SENTINEL_MASTER_NAME) { - const sentinels = REDIS_SENTINELS.split(",").map((sentinel: string) => { - const [host, port] = sentinel.trim().split(":") - return { host, port: parseInt(port) ?? 26379 } - }) - - const sentinelClient = createSentinel({ - name: REDIS_SENTINEL_MASTER_NAME, - sentinelRootNodes: sentinels, - nodeClientOptions: { - password: REDIS_PASSWORD, - database: REDIS_DB, - socket: { - reconnectStrategy: redisReconnectStrategy(), - }, - }, - }) - - sentinelClient.on("error", (err: any) => { - _logger.error(`Redis Sentinel error`, err) - }) - sentinelClient.on("ready", () => { - _logger.info( - `Redis Sentinel connected to master: ${REDIS_SENTINEL_MASTER_NAME}`, - ) - }) - - sentinelClient.connect().catch((err: any) => { - _logger.error(`Redis Sentinel connection failed`, err) - }) - - client = sentinelClient as RedisClient - } else { - let url = (REDIS_URL && REDIS_URL.trim()) || "redis://127.0.0.1:6379" - if (url && !url.startsWith("redis://") && !url.startsWith("rediss://")) { - url = `redis://${url}` - } - if (url && !url.includes(":") && !url.includes("/")) { - url = `${url}:6379` - } - const regularClient = createClient({ - url, - password: REDIS_PASSWORD, - database: REDIS_DB, - socket: { - reconnectStrategy: redisReconnectStrategy(), - }, - }) - - regularClient.on("error", (err: any) => { - _logger.error(`Redis error`, err) - }) - regularClient.on("ready", () => { - _logger.info(`Redis connected to: ${url}`) - }) - - regularClient.connect().catch((err: any) => { - _logger.error(`Redis connection failed`, err) - }) - - client = regularClient as RedisClient + let url = (REDIS_URL && REDIS_URL.trim()) || "redis://127.0.0.1:6379" + if (url && !url.startsWith("redis://") && !url.startsWith("rediss://")) { + url = `redis://${url}` + } + if (url && !url.includes(":") && !url.includes("/")) { + url = `${url}:6379` } + const client = createClient({ + url, + password: REDIS_PASSWORD, + database: REDIS_DB, + socket: { + reconnectStrategy: redisReconnectStrategy(), + }, + }) + + client.on("error", (err: any) => { + _logger.error(`Redis error`, err) + }) + client.on("ready", () => { + _logger.info(`Redis connected to: ${url}`) + }) + + client.connect().catch((err: any) => { + _logger.error(`Redis connection failed`, err) + }) + redisClient = client return client } @@ -152,7 +109,7 @@ const getRedisSubscriberClient = (): RedisClient | undefined => { socket: { reconnectStrategy: redisReconnectStrategy("Redis Subscriber"), }, - }) as RedisClient + }) subscriber.on("error", (err: any) => { _logger.error(`Redis subscriber error`, err) @@ -161,10 +118,6 @@ const getRedisSubscriberClient = (): RedisClient | undefined => { _logger.info(`Redis subscriber connected to: ${url}`) }) - subscriber.connect().catch((err: any) => { - _logger.error(`Redis subscriber connection failed`, err) - }) - redisSubscriberClient = subscriber return redisSubscriberClient } diff --git a/backend/wsServer.ts b/backend/wsServer.ts deleted file mode 100644 index 51741a1e5..000000000 --- a/backend/wsServer.ts +++ /dev/null @@ -1,205 +0,0 @@ -import { createServer } from "http" - -import parseJSON from "json-parse-even-better-errors" -import * as WebSocketServer from "websocket" -import * as winston from "winston" - -import { isTest, NEXUS_REFLECTION } from "./config" -import { UserInfo } from "./domain/UserInfo" -import getRedisClient, { getRedisSubscriberClient } from "./services/redis" -import { getCurrentUserDetails } from "./services/tmc" - -const logger = winston.createLogger({ - level: "info", - format: winston.format.combine( - winston.format.timestamp(), - winston.format.json(), - ), - defaultMeta: { service: "wsServer" }, - transports: [new winston.transports.Console()], -}) - -const webSocketsServerPort = 9000 - -const server = createServer() - -export const wsListen = () => server.listen(webSocketsServerPort) - -const wsServer = new WebSocketServer.server({ - httpServer: server, -}) - -const connectionByUserCourse = new Map() -const userCourseByConnection = new Map() - -let subscriber: ReturnType | undefined - -export enum MessageType { - PROGRESS_UPDATED = "PROGRESS_UPDATED", - PEER_REVIEW_RECEIVED = "PEER_REVIEW_RECEIVED", - QUIZ_CONFIRMED = "QUIZ_CONFIRMED", - QUIZ_REJECTED = "QUIZ_REJECTED", - COURSE_CONFIRMED = "COURSE_CONFIRMED", -} - -export const pushMessageToClient = async ( - userId: number, - courseId: string, - type: MessageType, - payload?: string, -) => { - const userCourseObjectString = JSON.stringify({ userId, courseId }) - const connection = connectionByUserCourse.get(userCourseObjectString) - - if (connection) { - if (connection.connected) { - connection.sendUTF( - JSON.stringify({ - type, - message: payload, - }), - ) - } else { - connectionByUserCourse.delete(userCourseObjectString) - await getRedisClient()?.publish( - "websocket", - JSON.stringify({ userId, courseId, type, message: payload }), - ) - } - } else { - await getRedisClient()?.publish( - "websocket", - JSON.stringify({ userId, courseId, type, message: payload }), - ) - } -} - -wsServer.on("request", (request: any) => { - logger.info("Request", request.origin) - const connection = request.accept("echo-protocol", request.origin) - - connection.on("message", async (message: any) => { - let data - try { - data = parseJSON(message.utf8Data) - } catch (error) { - logger.error("Error parsing message", error) - return - } - - if ( - data instanceof Object && - !Array.isArray(data) && - data.accessToken && - data.courseId - ) { - const accessToken = data.accessToken as string - const courseId = data.courseId as string - try { - let user = parseJSON( - (await getRedisClient()?.get(accessToken)) ?? "", - ) as UserInfo | null - if (!user) { - user = await getCurrentUserDetails(accessToken) - await getRedisClient()?.set(accessToken, JSON.stringify(user), { - EX: 3600, - }) - } - const userCourseObject = { - userId: user.id, - courseId, - } - connectionByUserCourse.set(JSON.stringify(userCourseObject), connection) - userCourseByConnection.set(connection, userCourseObject) - logger.info("Connection verified") - } catch (error) { - connection.drop() - logger.info("Connection rejected") - } - } else { - connection.drop() - } - }) - - connection.on("close", () => { - const userCourseObjectString = JSON.stringify( - userCourseByConnection.get(connection), - ) - userCourseByConnection.delete(connection) - connectionByUserCourse.delete(userCourseObjectString) - }) -}) - -const createSubscriber = async () => { - if (NEXUS_REFLECTION || isTest) { - return - } - - while (!getRedisClient()?.isOpen) { - logger.info("Waiting on main Redis client to be ready...") - await new Promise((resolve) => setTimeout(resolve, 100)) - } - - subscriber = getRedisSubscriberClient() - - if (!subscriber) { - logger.error("Failed to get Redis subscriber client") - return - } - - subscriber.on("error", (err: any) => { - logger.error("Redis subscriber error", err) - }) - subscriber.on("ready", () => { - logger.info(`Redis subscriber connected`) - }) - - subscriber.on("message", (channel: string, message: string) => { - if (channel !== "websocket") { - return - } - - let data - try { - data = parseJSON(message) - } catch (error) { - logger.error("Error parsing message", error) - return - } - - if ( - data instanceof Object && - !Array.isArray(data) && - data.userId && - data.courseId && - data.type - ) { - const userId = data.userId as string - const courseId = data.courseId as string - const userCourseObjectString = JSON.stringify({ userId, courseId }) - const connection = connectionByUserCourse.get(userCourseObjectString) - if (connection) { - if (connection.connected) { - connection.sendUTF( - JSON.stringify({ - type: data.type, - message: data.message, - }), - ) - } else { - connectionByUserCourse.delete(userCourseObjectString) - } - } - } - }) - - if (!subscriber.isOpen) { - await subscriber.connect() - } - - await subscriber.subscribe("websocket", () => { - logger.info("Subscribed to websocket channel") - }) -} - -createSubscriber()