-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdb.js
More file actions
122 lines (110 loc) · 3.73 KB
/
db.js
File metadata and controls
122 lines (110 loc) · 3.73 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
// Database layer -- SQLite via better-sqlite3
import Database from 'better-sqlite3';
import { readFileSync } from 'fs';
import { join, dirname } from 'path';
import { fileURLToPath } from 'url';
import { ensureSchedulerDbParent, resolveSchedulerDbPath } from './paths.js';
const __dirname = dirname(fileURLToPath(import.meta.url));
let _db;
let _dbPath;
/**
* Override the DB path at runtime (must be called before getDb/initDb).
* Pass ':memory:' for in-memory test databases.
*/
export function setDbPath(path) {
if (_db) { _db.close(); _db = null; }
_dbPath = path;
}
export function getDb() {
if (!_db) {
const dbPath = _dbPath || resolveSchedulerDbPath({ env: process.env });
if (dbPath !== ':memory:') ensureSchedulerDbParent(dbPath);
_db = new Database(dbPath);
if (dbPath !== ':memory:') _db.pragma('journal_mode = WAL');
_db.pragma('busy_timeout = 5000');
_db.pragma('foreign_keys = ON');
}
return _db;
}
export function getResolvedDbPath() {
return _dbPath || resolveSchedulerDbPath({ env: process.env });
}
export async function initDb() {
const db = getDb();
const schema = readFileSync(join(__dirname, 'schema.sql'), 'utf8');
const hasUserTables = (db.prepare(`
SELECT COUNT(*) AS cnt
FROM sqlite_master
WHERE type = 'table'
AND name NOT LIKE 'sqlite_%'
`).get()?.cnt ?? 0) > 0;
const applySchema = (label) => {
try {
db.exec(schema);
return true;
} catch (err) {
process.stderr.write(`${new Date().toISOString()} [db] ${label}: ${err.message}\n`);
return false;
}
};
const runConsolidate = async () => {
try {
const { default: consolidate } = await import('./migrate-consolidate.js');
const applied = consolidate();
if (applied) {
process.stderr.write(`${new Date().toISOString()} [db] Consolidation migration applied\n`);
}
} catch (err) {
process.stderr.write(`${new Date().toISOString()} [db] migrate-consolidate error: ${err.message}\n`);
}
};
if (hasUserTables) {
// Existing installs: normalize via migration first so schema re-apply doesn't
// trip over legacy partial tables/indexes.
await runConsolidate();
applySchema('Schema apply warning');
return db;
}
// Net-new installs: create the baseline schema, then run consolidation in case
// a package upgrade adds idempotent backfills the base schema doesn't need.
applySchema('Initial schema apply warning');
await runConsolidate();
// Re-apply schema so indexes/table defs are fully aligned after consolidation.
applySchema('Schema re-apply warning');
return db;
}
/**
* Checkpoint WAL to main DB file. Call periodically to minimize
* data loss window on crash/SIGKILL. Returns checkpoint stats.
*/
export function checkpointWal() {
if (!_db) return null;
try {
const result = _db.pragma('wal_checkpoint(PASSIVE)');
return result?.[0] || null;
} catch (err) {
const ts = new Date().toISOString();
process.stderr.write(`${ts} [db] WAL checkpoint error: ${err.message}\n`);
return null;
}
}
export function closeDb() {
if (_db) {
try {
// Checkpoint WAL to main DB before closing to prevent data loss
const result = _db.pragma('wal_checkpoint(TRUNCATE)');
const ts = new Date().toISOString();
if (result && result[0]) {
const r = result[0];
process.stderr.write(`${ts} [db] WAL checkpoint on close: busy=${r.busy}, checkpointed=${r.checkpointed}, log=${r.log}\n`);
} else {
process.stderr.write(`${ts} [db] WAL checkpoint on close: ok\n`);
}
} catch (err) {
const ts = new Date().toISOString();
process.stderr.write(`${ts} [db] WAL checkpoint failed on close: ${err.message}\n`);
}
_db.close();
_db = null;
}
}