Skip to content

Commit ccc65c4

Browse files
chore: add shared redis client
1 parent 7b5387c commit ccc65c4

5 files changed

Lines changed: 64 additions & 17 deletions

File tree

app/external/src/persistence.module.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import {PKCE_SESSION_REPOSITORY_TOKEN, REFRESH_TOKEN_REPOSITORY_TOKEN} from "@se
3636
import {ConfigModule} from "./config.module"
3737
import {QueueModule} from "./queue/queue.module"
3838
import {BullQueueProvider} from "./queue/queue.provider"
39+
import {RedisClient} from "./redis"
3940

4041
const agentRepository = {
4142
provide: AGENT_REPOSITORY_TOKEN,
@@ -137,7 +138,7 @@ const repositories = [
137138

138139
@Module({
139140
imports: [ConfigModule, QueueModule],
140-
providers: [DatabaseClient, ...repositories, queueProvider],
141+
providers: [DatabaseClient, ...repositories, queueProvider, RedisClient],
141142
exports: [...repositories, queueProvider]
142143
})
143144
export class PersistenceModule {}

app/external/src/rate-limiter/rate-limiter.provider.ts

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import {ConsumePointsError, RateLimiterProvider} from "@services"
66
import * as TE from "fp-ts/lib/TaskEither"
77
import {pipe} from "fp-ts/lib/function"
88
import {TaskEither} from "fp-ts/lib/TaskEither"
9+
import {waitForRedisConnection} from "../redis/redis.utils"
910

1011
export {RateLimiterRes} from "rate-limiter-flexible"
1112

@@ -26,19 +27,7 @@ export class RedisRateLimiterProvider implements RateLimiterProvider, OnModuleIn
2627
enableOfflineQueue: false
2728
})
2829

29-
// Wait for the Redis TCP connection to be established before accepting requests.
30-
// This is required because enableOfflineQueue is false, which means commands sent
31-
// before the connection is ready will fail immediately instead of being queued.
32-
// The "error" listener ensures that connection failures (e.g. wrong host/port)
33-
// surface at startup rather than silently falling through to the fail-open path.
34-
try {
35-
await new Promise<void>((resolve, reject) => {
36-
this.getRedisClient().once("ready", resolve)
37-
this.getRedisClient().once("error", reject)
38-
})
39-
} catch {
40-
throw new Error(`Unable to connect to Redis at ${config.host}:${config.port}`)
41-
}
30+
await waitForRedisConnection(this.getRedisClient(), "rate-limiter-redis-client")
4231

4332
this.rateLimiter = new RateLimiterRedis({
4433
storeClient: this.redisClient,
@@ -71,9 +60,7 @@ export class RedisRateLimiterProvider implements RateLimiterProvider, OnModuleIn
7160
this.getRateLimiter()
7261
.consume(key, points)
7362
.catch(error => {
74-
if (error instanceof RateLimiterRes) {
75-
return error
76-
}
63+
if (error instanceof RateLimiterRes) return error
7764
throw error
7865
})
7966
),

app/external/src/redis/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
export * from "./redis-client"
2+
export * from "./redis.utils"
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
import {Injectable, OnModuleDestroy, OnModuleInit} from "@nestjs/common"
2+
import Redis from "ioredis"
3+
import {ConfigProvider} from "../config"
4+
import {waitForRedisConnection} from "./redis.utils"
5+
6+
@Injectable()
7+
export class RedisClient extends Redis implements OnModuleInit, OnModuleDestroy {
8+
constructor(readonly configProvider: ConfigProvider) {
9+
const config = configProvider.redisConfig
10+
super({
11+
host: config.host,
12+
port: config.port,
13+
db: config.db,
14+
enableOfflineQueue: false
15+
})
16+
}
17+
18+
async onModuleInit() {
19+
await waitForRedisConnection(this, "shared-redis-client")
20+
}
21+
22+
onModuleDestroy() {
23+
this.disconnect()
24+
}
25+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
import {Logger} from "@nestjs/common"
2+
import Redis from "ioredis"
3+
4+
/**
5+
* Ensures a Redis connection is fully established before continuing.
6+
*
7+
* By default, ioredis connects asynchronously. We wait for the `"ready"` event here to guarantee the connection is usable.
8+
*
9+
* We also listen for `"error"` during this startup phase to eagerly throw an error and abort
10+
* startup if the Redis server is unreachable (e.g. wrong host or port).
11+
*
12+
* A check for `client.status === "ready"` is included to avoid a race condition where the
13+
* connection is established instantly and the `"ready"` event fires
14+
* *before* the promise listener is attached, which would cause the startup to hang indefinitely.
15+
*/
16+
export async function waitForRedisConnection(client: Redis, clientName: string): Promise<void> {
17+
if (client.status === "ready") {
18+
Logger.log(`Redis connection ${clientName} already ready`, "RedisClient")
19+
return
20+
}
21+
22+
try {
23+
await new Promise<void>((resolve, reject) => {
24+
client.once("ready", resolve)
25+
client.once("error", reject)
26+
})
27+
Logger.log(`Redis connection ${clientName} ready`, "RedisClient")
28+
} catch {
29+
const options = client.options
30+
throw new Error(`Unable to connect to Redis at ${options.host}:${options.port}`)
31+
}
32+
}

0 commit comments

Comments
 (0)