-
Notifications
You must be signed in to change notification settings - Fork 82
Expand file tree
/
Copy pathDatabaseConnection.ts
More file actions
124 lines (117 loc) · 3.77 KB
/
DatabaseConnection.ts
File metadata and controls
124 lines (117 loc) · 3.77 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import { Pool } from 'pg';
import { env } from '~/config';
const DB_HOST = env.get('DB_HOST');
const DB_PORT = env.get('DB_PORT');
const DB_USER = env.get('DB_USER');
const DB_PASS = env.get('DB_PASS');
const DB_NAME = env.get('DB_NAME');
export class DatabaseConnection {
private static instance: DatabaseConnection;
private pool: Pool;
constructor() {
const config = {
host: DB_HOST as string,
port: Number(DB_PORT) as number,
user: DB_USER as string,
password: DB_PASS as string,
database: DB_NAME as string,
ssl:
process.env.NODE_ENV === 'production'
? { rejectUnauthorized: false }
: false,
max: 15,
min: 2,
idleTimeoutMillis: 30000,
connectionTimeoutMillis: 10000,
};
this.pool = new Pool(config);
}
static getInstance() {
if (!DatabaseConnection.instance) {
DatabaseConnection.instance = new DatabaseConnection();
}
return DatabaseConnection.instance;
}
async query(statement: string, params: any) {
const connection = await this.pool.connect();
try {
const result = await connection.query(statement, params);
return result.rows;
} catch (error) {
try {
await connection.query('ROLLBACK');
} catch {}
throw error;
} finally {
connection.release();
}
}
/**
* Execute a query with per-session statement_timeout and lock_timeout.
*
* REFRESH MATERIALIZED VIEW CONCURRENTLY cannot run inside a transaction block.
* For those queries, timeouts are applied as session-level SET commands and
* reset afterward. For all other queries, SET LOCAL inside BEGIN/COMMIT is used
* so timeouts don't leak to other connections.
*/
async queryWithTimeout(
statement: string,
params: any,
statementTimeoutMs: number,
lockTimeoutMs: number,
) {
// VACUUM and REFRESH MATERIALIZED VIEW CONCURRENTLY cannot run inside a transaction block
const requiresNoTransaction =
/REFRESH\s+MATERIALIZED\s+VIEW\s+CONCURRENTLY/i.test(statement) ||
/^\s*VACUUM\b/i.test(statement);
const connection = await this.pool.connect();
try {
if (requiresNoTransaction) {
// Cannot use BEGIN/COMMIT — apply timeouts at session level and reset after
await connection.query(`SET statement_timeout = ${statementTimeoutMs}`);
await connection.query(`SET lock_timeout = ${lockTimeoutMs}`);
const result = await connection.query(statement, params);
await connection.query('RESET statement_timeout');
await connection.query('RESET lock_timeout');
return result.rows;
}
await connection.query('BEGIN');
await connection.query(
`SET LOCAL statement_timeout = ${statementTimeoutMs}`,
);
await connection.query(`SET LOCAL lock_timeout = ${lockTimeoutMs}`);
const result = await connection.query(statement, params);
await connection.query('COMMIT');
return result.rows;
} catch (error) {
if (!requiresNoTransaction) {
try {
await connection.query('ROLLBACK');
} catch {}
} else {
try {
await connection.query('RESET statement_timeout');
await connection.query('RESET lock_timeout');
} catch {}
}
throw error;
} finally {
connection.release();
}
}
async executeTransaction(queries: { statement: string; params: any }[]) {
const connection = await this.pool.connect();
try {
await connection.query('begin');
for (const query of queries) {
await connection.query(query.statement, query.params);
}
await connection.query('commit');
} catch (e: any) {
await connection.query('rollback');
throw e;
} finally {
connection.release();
}
}
}