Skip to content

Commit 01440ae

Browse files
committed
Add Redis initialization to Kafka consumers and app startup
1 parent 55c8ba8 commit 01440ae

9 files changed

Lines changed: 58 additions & 2 deletions

File tree

backend/app.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { ServerContext } from "./context"
55
import prisma from "./prisma"
66
import server from "./server"
77
import knex from "./services/knex"
8+
import { initializeRedis } from "./services/redis"
89
import { attachPrismaEvents } from "./util"
910

1011
require("sharp") // image library sharp seems to crash without this require
@@ -35,6 +36,9 @@ const startApp = async () => {
3536
attachPrismaEvents(ctx)
3637

3738
if (!NEXUS_REFLECTION) {
39+
initializeRedis().catch((err) => {
40+
logger.warn("Redis initialization failed, continuing without cache", err)
41+
})
3842
await ctx.prisma.$connect()
3943
httpServer.listen(4000, () => {
4044
console.log("server running on port 4000")

backend/bin/kafkaConsumer/exerciseConsumer/kafkaConsumer.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { KafkaError } from "../../../lib/errors"
55
import sentryLogger from "../../../lib/logger"
66
import prisma from "../../../prisma"
77
import knex from "../../../services/knex"
8+
import { initializeRedis } from "../../../services/redis"
89
import { createKafkaConsumer } from "../common/createKafkaConsumer"
910
import { handleMessage } from "../common/handleMessage"
1011
import { KafkaContext } from "../common/kafkaContext"
@@ -21,6 +22,10 @@ const logger = sentryLogger({ service: "kafka-consumer-exercise" })
2122

2223
const consumer = createKafkaConsumer({ logger, prisma })
2324

25+
initializeRedis().catch((err) => {
26+
logger.warn("Redis initialization failed, continuing without cache", err)
27+
})
28+
2429
consumer.connect()
2530

2631
const context: KafkaContext = {

backend/bin/kafkaConsumer/userCoursePointsConsumer/kafkaConsumer.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { KafkaError } from "../../../lib/errors"
55
import sentryLogger from "../../../lib/logger"
66
import prisma from "../../../prisma"
77
import knex from "../../../services/knex"
8+
import { initializeRedis } from "../../../services/redis"
89
import { createKafkaConsumer } from "../common/createKafkaConsumer"
910
import { handleMessage } from "../common/handleMessage"
1011
import { KafkaContext } from "../common/kafkaContext"
@@ -20,6 +21,10 @@ const mutex = new Mutex()
2021
const logger = sentryLogger({ service: "kafka-consumer-user-course-points" })
2122
const consumer = createKafkaConsumer({ logger, prisma })
2223

24+
initializeRedis().catch((err) => {
25+
logger.warn("Redis initialization failed, continuing without cache", err)
26+
})
27+
2328
consumer.connect()
2429

2530
const context: KafkaContext = {

backend/bin/kafkaConsumer/userCoursePointsRealtimeConsumer/kafkaConsumer.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { KafkaError } from "../../../lib/errors"
55
import sentryLogger from "../../../lib/logger"
66
import prisma from "../../../prisma"
77
import knex from "../../../services/knex"
8+
import { initializeRedis } from "../../../services/redis"
89
import { createKafkaConsumer } from "../common/createKafkaConsumer"
910
import { handleMessage } from "../common/handleMessage"
1011
import { KafkaContext } from "../common/kafkaContext"
@@ -22,6 +23,10 @@ const logger = sentryLogger({
2223
})
2324
const consumer = createKafkaConsumer({ logger, prisma })
2425

26+
initializeRedis().catch((err) => {
27+
logger.warn("Redis initialization failed, continuing without cache", err)
28+
})
29+
2530
consumer.connect()
2631

2732
const context: KafkaContext = {

backend/bin/kafkaConsumer/userCourseProgressConsumer/kafkaConsumer.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { KafkaError } from "../../../lib/errors"
55
import sentryLogger from "../../../lib/logger"
66
import prisma from "../../../prisma"
77
import knex from "../../../services/knex"
8+
import { initializeRedis } from "../../../services/redis"
89
import { createKafkaConsumer } from "../common/createKafkaConsumer"
910
import { handleMessage } from "../common/handleMessage"
1011
import { KafkaContext } from "../common/kafkaContext"
@@ -22,6 +23,10 @@ const TOPIC_NAME = [config.user_course_progress_consumer.topic_name]
2223
const logger = sentryLogger({ service: "kafka-consumer-UserCourseProgress" })
2324
const consumer = createKafkaConsumer({ logger, prisma })
2425

26+
initializeRedis().catch((err) => {
27+
logger.warn("Redis initialization failed, continuing without cache", err)
28+
})
29+
2530
const context: KafkaContext = {
2631
prisma,
2732
logger,

backend/bin/kafkaConsumer/userCourseProgressRealtimeConsumer/kafkaConsumer.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { KafkaError } from "../../../lib/errors"
55
import sentryLogger from "../../../lib/logger"
66
import prisma from "../../../prisma"
77
import knex from "../../../services/knex"
8+
import { initializeRedis } from "../../../services/redis"
89
import { createKafkaConsumer } from "../common/createKafkaConsumer"
910
import { handleMessage } from "../common/handleMessage"
1011
import { KafkaContext } from "../common/kafkaContext"
@@ -25,6 +26,10 @@ const logger = sentryLogger({
2526
})
2627
const consumer = createKafkaConsumer({ logger, prisma })
2728

29+
initializeRedis().catch((err) => {
30+
logger.warn("Redis initialization failed, continuing without cache", err)
31+
})
32+
2833
consumer.connect()
2934

3035
const context: KafkaContext = {

backend/bin/kafkaConsumer/userPointsConsumer/kafkaConsumer.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { KafkaError } from "../../../lib/errors"
55
import sentryLogger from "../../../lib/logger"
66
import prisma from "../../../prisma"
77
import knex from "../../../services/knex"
8+
import { initializeRedis } from "../../../services/redis"
89
import { createKafkaConsumer } from "../common/createKafkaConsumer"
910
import { handleMessage } from "../common/handleMessage"
1011
import { KafkaContext } from "../common/kafkaContext"
@@ -20,6 +21,10 @@ const mutex = new Mutex()
2021
const logger = sentryLogger({ service: "kafka-consumer-user-points" })
2122
const consumer = createKafkaConsumer({ logger, prisma })
2223

24+
initializeRedis().catch((err) => {
25+
logger.warn("Redis initialization failed, continuing without cache", err)
26+
})
27+
2328
consumer.connect()
2429

2530
const context: KafkaContext = {

backend/bin/kafkaConsumer/userPointsRealtimeConsumer/kafkaConsumer.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { KafkaError } from "../../../lib/errors"
55
import sentryLogger from "../../../lib/logger"
66
import prisma from "../../../prisma"
77
import knex from "../../../services/knex"
8+
import { initializeRedis } from "../../../services/redis"
89
import { createKafkaConsumer } from "../common/createKafkaConsumer"
910
import { handleMessage } from "../common/handleMessage"
1011
import { KafkaContext } from "../common/kafkaContext"
@@ -20,6 +21,10 @@ const mutex = new Mutex()
2021
const logger = sentryLogger({ service: "kafka-consumer-user-points-realtime" })
2122
const consumer = createKafkaConsumer({ logger, prisma })
2223

24+
initializeRedis().catch((err) => {
25+
logger.warn("Redis initialization failed, continuing without cache", err)
26+
})
27+
2328
consumer.connect()
2429

2530
const context: KafkaContext = {

backend/services/redis.ts

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ const getRedisClient = (): RedisClient | undefined => {
4949
return
5050
}
5151

52-
let url = (REDIS_URL && REDIS_URL.trim()) || "redis://127.0.0.1:6379"
52+
let url = REDIS_URL?.trim() || "redis://127.0.0.1:6379"
5353
if (url && !url.startsWith("redis://") && !url.startsWith("rediss://")) {
5454
url = `redis://${url}`
5555
}
@@ -81,6 +81,23 @@ const getRedisClient = (): RedisClient | undefined => {
8181
return client
8282
}
8383

84+
export const initializeRedis = async (): Promise<void> => {
85+
_logger.info("Initializing Redis")
86+
try {
87+
const client = getRedisClient()
88+
if (client && !client.isOpen) {
89+
try {
90+
await client.connect()
91+
_logger.info("Redis connection established")
92+
} catch (err: any) {
93+
_logger.warn(`Redis connection failed, will retry automatically`, err)
94+
}
95+
}
96+
} catch (err: any) {
97+
_logger.warn(`Redis initialization error, continuing without cache`, err)
98+
}
99+
}
100+
84101
const getRedisSubscriberClient = (): RedisClient | undefined => {
85102
if (redisSubscriberClient) {
86103
return redisSubscriberClient
@@ -94,7 +111,7 @@ const getRedisSubscriberClient = (): RedisClient | undefined => {
94111
return
95112
}
96113

97-
let url = (REDIS_URL && REDIS_URL.trim()) || "redis://127.0.0.1:6379"
114+
let url = REDIS_URL?.trim() || "redis://127.0.0.1:6379"
98115
if (url && !url.startsWith("redis://") && !url.startsWith("rediss://")) {
99116
url = `redis://${url}`
100117
}

0 commit comments

Comments
 (0)