-
Notifications
You must be signed in to change notification settings - Fork 5.5k
Expand file tree
/
Copy pathclient.ts
More file actions
387 lines (342 loc) · 12.8 KB
/
client.ts
File metadata and controls
387 lines (342 loc) · 12.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
/**
* Database Client
*
* Environment Variables:
* - DATABASE_URL: Primary database URL. Use a Neon URL for cloud, pglite:// for
* embedded local dev, or a vanilla postgresql:// URL.
*
* @module db/client
*/
import { AsyncLocalStorage } from "node:async_hooks";
import { Pool as NeonPool, neonConfig } from "@neondatabase/serverless";
import { drizzle as drizzleNeon } from "drizzle-orm/neon-serverless";
import type { NodePgDatabase, NodePgTransaction } from "drizzle-orm/node-postgres";
import { drizzle as drizzleNode } from "drizzle-orm/node-postgres";
import { drizzle as drizzlePGlite, type PgliteDatabase } from "drizzle-orm/pglite";
import type { ExtractTablesWithRelations } from "drizzle-orm/relations";
import { Pool as PgPool, type PoolConfig } from "pg";
import ws from "ws";
import { getCloudAwareEnv } from "../lib/runtime/cloud-bindings";
import { applyDatabaseUrlFallback } from "./database-url";
import { disableLocalPreparedStatements } from "./local-pg-query";
import * as schema from "./schemas";
// ============================================================================
// Types
// ============================================================================
type SchemaTables = ExtractTablesWithRelations<typeof schema>;
/** Canonical DB type for repositories: avoids union-of-drivers collapsing overloads. */
type Database = NodePgDatabase<typeof schema>;
/** Transaction handle for `writeTransaction` callbacks. */
type DbTransaction = NodePgTransaction<typeof schema, SchemaTables>;
type DatabaseCloser = () => Promise<void> | void;
const databaseClosers = new WeakMap<Database, DatabaseCloser>();
function registerDatabaseCloser(database: Database, closer: DatabaseCloser): Database {
databaseClosers.set(database, closer);
return database;
}
/**
* Get the primary database URL (always required)
*/
function getPrimaryDatabaseUrl(): string {
const url = applyDatabaseUrlFallback(getCloudAwareEnv());
if (!url) {
throw new Error(
"DATABASE_URL is not set. Use a Neon URL for cloud, a `pglite://<dir>` URL for embedded local dev, or a vanilla `postgresql://` URL.",
);
}
return url;
}
// ============================================================================
// Database Connection Factory
// ============================================================================
/**
* Checks if a database URL is for Neon serverless
*/
function isNeonDatabase(url: string): boolean {
return url.includes("neon.tech") || url.includes("neon.database");
}
function isCloudflareWorkerRuntime(): boolean {
return typeof globalThis !== "undefined" && "WebSocketPair" in globalThis;
}
/**
* Parse a `pglite://<dataDir>` URL into the directory path used by
* `@electric-sql/pglite`. `pglite://memory` (or empty path) maps to in-memory.
*/
function parsePGliteDataDir(url: string): string {
const stripped = url.slice("pglite://".length);
if (!stripped || stripped === "memory") {
return "memory://";
}
return stripped;
}
/**
* Build a PGlite instance with the `vector` extension loaded so the
* cloud schema's pgvector columns (used by trajectories, embeddings, etc.)
* resolve at migration and query time. Synchronous module require keeps the
* call site type as `Database`; PGlite is bun/node-only and does not exist
* on the Workers runtime.
*/
function createPGliteClient(dataDir: string): Database {
const { PGlite } = require("@electric-sql/pglite") as typeof import("@electric-sql/pglite");
const { vector } =
require("@electric-sql/pglite/vector") as typeof import("@electric-sql/pglite/vector");
const client = new PGlite({
dataDir: dataDir === "memory://" ? undefined : dataDir,
extensions: { vector },
});
const database: PgliteDatabase<typeof schema> = drizzlePGlite({ client, schema });
return registerDatabaseCloser(database as Database, async () => {
await client.close();
});
}
function isLocalTcpPostgresUrl(url: string): boolean {
try {
const parsed = new URL(url);
return (
(parsed.protocol === "postgres:" || parsed.protocol === "postgresql:") &&
(parsed.hostname === "127.0.0.1" || parsed.hostname === "localhost")
);
} catch {
return false;
}
}
function parsePositiveInteger(value: string | undefined, fallback: number): number {
if (!value) {
return fallback;
}
const parsed = Number.parseInt(value, 10);
return Number.isFinite(parsed) && parsed > 0 ? parsed : fallback;
}
function createPgPool(url: string): PgPool {
const options: PoolConfig = { connectionString: url };
const env = getCloudAwareEnv();
const inWorkerRuntime = isCloudflareWorkerRuntime();
const isLocalTcp = isLocalTcpPostgresUrl(url);
if (inWorkerRuntime) {
options.max = parsePositiveInteger(env.LOCAL_PG_POOL_MAX, 1);
// Discard connections after a single query — Workers can't reliably
// share I/O across requests. EXCEPT against local PGlite: the PGlite
// socket bridge is fragile and creating a fresh TCP connection per
// query causes "Connection terminated unexpectedly" mid-stream. Local
// dev uses long-lived connections instead; the per-request isolation
// workers need only matters for shared remote pools.
options.maxUses = isLocalTcp ? 0 : 1;
options.connectionTimeoutMillis = 30_000;
}
if (isLocalTcp) {
options.max = parsePositiveInteger(env.LOCAL_PG_POOL_MAX, 8);
// Keep idle connections around long enough that consecutive requests
// reuse them instead of churning the PGlite socket bridge. Worker-runtime
// already overrides max + maxUses above; this just bumps idle to 30s.
options.idleTimeoutMillis = 30_000;
options.connectionTimeoutMillis = 30_000;
}
const pool = new PgPool(options);
if (isLocalTcp) {
disableLocalPreparedStatements(pool, { simpleQueryMode: inWorkerRuntime });
}
return pool;
}
/**
* Create a database connection from a URL
*/
function createConnection(url: string): Database {
if (url.startsWith("pglite://")) {
if (isCloudflareWorkerRuntime()) {
throw new Error("pglite:// URLs are local-only and cannot run inside a Cloudflare Worker.");
}
return createPGliteClient(parsePGliteDataDir(url));
}
if (isNeonDatabase(url)) {
// WebSocket pool both on Workers and Node — neon-http does not implement
// `db.transaction(...)`, which breaks `writeTransaction` in db/helpers.ts.
if (typeof WebSocket === "undefined") {
neonConfig.webSocketConstructor = ws;
}
const pool = new NeonPool({ connectionString: url });
return registerDatabaseCloser(drizzleNeon(pool, { schema }) as Database, () => pool.end());
}
const pool = createPgPool(url);
return registerDatabaseCloser(drizzleNode(pool, { schema }) as Database, () => pool.end());
}
// ============================================================================
// Connection Manager
// ============================================================================
/**
* Per-request DB cache for the Workers runtime.
*
* Cloudflare Workers refuse to share I/O objects (TCP sockets, WebSockets,
* streams) across requests — a `Database` whose underlying pool was opened
* during request A throws when used in request B with:
*
* "Cannot perform I/O on behalf of a different request. (I/O type: Native)"
*
* Bootstrap middleware enters `runWithDbCacheAsync(...)` once per fetch
* invocation so each request gets its own `Map<url, Database>`. Outside
* Workers (Node, tests) the ALS store is empty and the manager falls back to
* a process-level singleton cache.
*/
const dbCacheAls = new AsyncLocalStorage<Map<string, Database>>();
export function runWithDbCache<T>(fn: () => T): T {
return dbCacheAls.run(new Map(), fn);
}
export async function runWithDbCacheAsync<T>(fn: () => Promise<T>): Promise<T> {
return await dbCacheAls.run(new Map(), fn);
}
/**
* Singleton connection manager for all database connections.
*
* On Workers the per-request store from `dbCacheAls` is preferred; the
* module-level `connections` Map is only used in Node/local where pools
* can safely live for the lifetime of the process.
*/
class DatabaseConnectionManager {
private connections: Map<string, Database> = new Map();
private initialized = false;
/**
* Get or create a database connection.
*
* Workers: caches in the request-scoped ALS store so I/O objects stay
* within the originating request handler. Falls through to a fresh
* connection if no ALS store exists (e.g. cron / scheduled handlers
* that didn't enter the bootstrap middleware).
*/
getConnection(url: string): Database {
if (isCloudflareWorkerRuntime()) {
const requestCache = dbCacheAls.getStore();
if (requestCache) {
let cached = requestCache.get(url);
if (!cached) {
cached = createConnection(url);
requestCache.set(url, cached);
}
return cached;
}
return createConnection(url);
}
if (!this.connections.has(url)) {
this.connections.set(url, createConnection(url));
}
return this.connections.get(url)!;
}
/**
* Get write connection.
*/
getWriteConnection(): Database {
const url = getPrimaryDatabaseUrl();
return this.getConnection(url);
}
/**
* Get read connection.
*/
getReadConnection(): Database {
const url = getPrimaryDatabaseUrl();
return this.getConnection(url);
}
/**
* Get connection info for debugging
*/
getConnectionInfo(): {
databaseUrlConfigured: boolean;
} {
const env = getCloudAwareEnv();
return {
databaseUrlConfigured: !!applyDatabaseUrlFallback(env),
};
}
/**
* Close process-level cached connections.
*
* Used by local test/dev harnesses that bring up and tear down ephemeral
* Postgres/PGlite servers in the same Node/Bun process. Workers use
* request-scoped caches and do not share this singleton pool.
*/
async closeAll(): Promise<void> {
const databases = Array.from(this.connections.values());
this.connections.clear();
const requestCache = dbCacheAls.getStore();
requestCache?.clear();
await Promise.all(
databases.map(async (database) => {
await databaseClosers.get(database)?.();
}),
);
}
}
const connectionManager = new DatabaseConnectionManager();
// ============================================================================
// Exported Database Instances
// ============================================================================
/**
* Primary database - routes to the primary write connection.
* Equivalent to `dbWrite`; prefer `dbRead` / `dbWrite` for read/write intent clarity.
*/
export const db = new Proxy({} as Database, {
get: (_, prop) => {
const database = connectionManager.getWriteConnection();
const value = database[prop as keyof typeof database];
return typeof value === "function" ? value.bind(database) : value;
},
});
/**
* Read-intent database connection.
* Currently uses the primary DATABASE_URL; keep this alias for repository
* read/write intent clarity after regional replicas were removed.
*
* @example
* // Read-intent query
* const users = await dbRead.query.users.findMany();
*/
export const dbRead = new Proxy({} as Database, {
get: (_, prop) => {
const database = connectionManager.getReadConnection();
const value = database[prop as keyof typeof database];
return typeof value === "function" ? value.bind(database) : value;
},
});
/**
* Write database - routes to the primary connection.
* Use for INSERT, UPDATE, DELETE operations
*
* @example
* // Write to primary
* await dbWrite.insert(users).values({ name: 'John' });
*/
export const dbWrite = new Proxy({} as Database, {
get: (_, prop) => {
const database = connectionManager.getWriteConnection();
const value = database[prop as keyof typeof database];
return typeof value === "function" ? value.bind(database) : value;
},
});
// ============================================================================
// Helper Functions
// ============================================================================
/**
* Get connection info for debugging/monitoring
*/
export function getDbConnectionInfo() {
return connectionManager.getConnectionInfo();
}
/**
* Close cached process-local DB pools for local test/dev teardown.
*/
export async function closeDatabaseConnectionsForTests(): Promise<void> {
await connectionManager.closeAll();
}
/**
* Execute a read-intent query.
*/
export async function withReadDb<T>(fn: (db: Database) => Promise<T>): Promise<T> {
return fn(connectionManager.getReadConnection());
}
/**
* Execute a write query (uses primary)
*/
export async function withWriteDb<T>(fn: (db: Database) => Promise<T>): Promise<T> {
return fn(connectionManager.getWriteConnection());
}
// ============================================================================
// Type Exports
// ============================================================================
export type { Database, DbTransaction };