|
1 | | -import { type InsertObject, Kysely, type QueryExecutor } from 'kysely' |
| 1 | +import { type InsertObject, Kysely } from 'kysely' |
2 | 2 | import { PostgresJSDialect } from 'kysely-postgres-js' |
3 | 3 | import postgres from 'postgres' |
4 | 4 |
|
5 | 5 | import { env } from '#/env.ts' |
6 | 6 | import type { DB } from './generated/index.ts' |
7 | 7 |
|
8 | 8 | export type Row<T extends keyof DB> = InsertObject<DB, T> |
9 | | -// @ts-ignore |
10 | | -let postgresClient = postgres(env.DATABASE_URL, { max: 1, debug: false, no_cache: true }) |
| 9 | + |
| 10 | +let postgresClient = postgres(env.DATABASE_URL) |
11 | 11 | let kyselyInstance = new Kysely<DB>({ |
12 | 12 | dialect: new PostgresJSDialect({ postgres: postgresClient }) |
13 | 13 | }) |
14 | 14 |
|
15 | 15 | let retryTimeout: ReturnType<typeof setTimeout> | null = null |
16 | | -let reconnectCount = 0 |
17 | | -let lastDisconnectAt: number | null = null |
18 | | -let isReconnecting = false |
19 | | - |
20 | | -// Optional hook: users of this module can assign this to observe reconnection events |
21 | | -export let onReconnect: ((info: { reconnectCount: number; since: number | null }) => void) | null = null |
22 | 16 |
|
23 | 17 | function scheduleReconnect() { |
24 | | - if (retryTimeout || isReconnecting) return |
25 | | - lastDisconnectAt = Date.now() |
26 | | - isReconnecting = true |
27 | | - |
28 | | - retryTimeout = setTimeout(async () => { |
| 18 | + if (retryTimeout) return |
| 19 | + retryTimeout = setTimeout(() => { |
29 | 20 | retryTimeout = null |
30 | 21 | try { |
31 | | - await postgresClient.end({ timeout: 0 }) // clean shutdown of old client |
32 | | - // @ts-ignore |
33 | | - postgresClient = postgres(env.DATABASE_URL, { max: 1, debug: false, no_cache: true }) |
| 22 | + postgresClient = postgres(env.DATABASE_URL) |
34 | 23 | kyselyInstance = new Kysely<DB>({ |
35 | 24 | dialect: new PostgresJSDialect({ postgres: postgresClient }) |
36 | 25 | }) |
37 | | - reconnectCount++ |
38 | | - isReconnecting = false |
39 | 26 | console.log('[POSTGRES] Reconnected successfully') |
40 | | - |
41 | | - if (onReconnect) { |
42 | | - onReconnect({ reconnectCount, since: lastDisconnectAt }) |
43 | | - } |
44 | 27 | } catch (err) { |
45 | 28 | console.error('[POSTGRES] Reconnection attempt failed:', err) |
46 | | - isReconnecting = false |
47 | 29 | scheduleReconnect() |
48 | 30 | } |
49 | 31 | }, 5000) |
50 | 32 | } |
51 | 33 |
|
52 | | -function wrapQueryErrorHandling< |
53 | | - T extends QueryExecutor & { execute: Function; executeTakeFirst: Function; executeTakeFirstOrThrow: Function } |
54 | | ->(executor: T): T { |
55 | | - const originalExecute = executor.execute |
56 | | - const originalExecuteTakeFirst = executor.executeTakeFirst |
57 | | - const originalExecuteTakeFirstOrThrow = executor.executeTakeFirstOrThrow |
58 | | - |
59 | | - executor.execute = async function (...args: any[]) { |
60 | | - if (isReconnecting) throw new Error('[POSTGRES] Currently reconnecting — query blocked.') |
61 | | - try { |
62 | | - return await originalExecute.apply(this, args) |
63 | | - } catch (err) { |
64 | | - console.error('[POSTGRES] Query failed:', err) |
65 | | - scheduleReconnect() |
66 | | - throw err |
67 | | - } |
68 | | - } |
69 | | - |
70 | | - executor.executeTakeFirst = async function (...args: any[]) { |
71 | | - if (isReconnecting) throw new Error('[POSTGRES] Currently reconnecting — query blocked.') |
72 | | - try { |
73 | | - return await originalExecuteTakeFirst.apply(this, args) |
74 | | - } catch (err) { |
75 | | - console.error('[POSTGRES] Query failed:', err) |
76 | | - scheduleReconnect() |
77 | | - throw err |
78 | | - } |
79 | | - } |
80 | | - |
81 | | - executor.executeTakeFirstOrThrow = async function (...args: any[]) { |
82 | | - if (isReconnecting) throw new Error('[POSTGRES] Currently reconnecting — query blocked.') |
83 | | - try { |
84 | | - return await originalExecuteTakeFirstOrThrow.apply(this, args) |
85 | | - } catch (err) { |
86 | | - console.error('[POSTGRES] Query failed:', err) |
87 | | - scheduleReconnect() |
88 | | - throw err |
89 | | - } |
90 | | - } |
| 34 | +const database: Kysely<DB> = new Proxy({} as Kysely<DB>, { |
| 35 | + get(_, prop) { |
| 36 | + const target = kyselyInstance[prop as keyof Kysely<DB>] |
91 | 37 |
|
92 | | - return executor |
93 | | -} |
94 | | - |
95 | | -const database = new Proxy(kyselyInstance, { |
96 | | - get(target, prop, receiver) { |
97 | | - const value = Reflect.get(target, prop, receiver) |
98 | | - if (typeof value === 'function') { |
| 38 | + if (typeof target === 'function') { |
99 | 39 | return (...args: any[]) => { |
100 | | - const result = value.apply(target, args) |
101 | | - return result?.execute ? wrapQueryErrorHandling(result) : result |
| 40 | + try { |
| 41 | + return (kyselyInstance[prop as keyof Kysely<DB>] as any)(...args) |
| 42 | + } catch (err) { |
| 43 | + console.error(`[POSTGRES] Kysely method ${String(prop)} failed:`, err) |
| 44 | + scheduleReconnect() |
| 45 | + throw err |
| 46 | + } |
102 | 47 | } |
103 | 48 | } |
104 | | - return value |
| 49 | + |
| 50 | + return target |
105 | 51 | } |
106 | 52 | }) |
107 | 53 |
|
|
0 commit comments