|
| 1 | +package nomad |
| 2 | + |
| 3 | +import java.sql.Connection |
| 4 | +import javax.sql.DataSource |
| 5 | +import org.slf4j.LoggerFactory |
| 6 | +import scala.util.Using |
| 7 | + |
| 8 | +/** Rebases a target Postgres database from a long-lived "rebase" template database |
| 9 | + * on the same Postgres cluster, using a fast file-level template copy. |
| 10 | + * |
| 11 | + * The rebase workflow is designed for developers who maintain a long-lived rebase |
| 12 | + * database at a known-good point in time (e.g., a restored production backup with |
| 13 | + * most migrations already applied). When their working database becomes |
| 14 | + * contaminated or needs to be reset, [[rebase]] drops it and re-clones it from |
| 15 | + * the rebase database via `CREATE DATABASE target WITH TEMPLATE rebase`. Any |
| 16 | + * pending migrations can then be applied on top via [[Migrator.migrate]]. |
| 17 | + * |
| 18 | + * The copy is a Postgres file-level operation: it does not re-execute schema or |
| 19 | + * data DDL, so it is dramatically faster than restoring from a logical dump. |
| 20 | + * |
| 21 | + * '''Destructive''': this drops the entire target database. The caller is |
| 22 | + * responsible for any confirmation prompting. |
| 23 | + * |
| 24 | + * Constraints (all enforced by [[rebase]] and reported as exceptions): |
| 25 | + * - Database type must be [[SupportedDatabase.Postgres]] (H2 is rejected). |
| 26 | + * - Both datasources must connect to the same Postgres cluster — the host |
| 27 | + * and port parsed from each JDBC URL must match exactly. |
| 28 | + * - Multi-host failover JDBC URLs are rejected. |
| 29 | + * - The target database must already exist (Nomad does not create databases). |
| 30 | + * - Target and rebase database names must differ. |
| 31 | + * |
| 32 | + * @param mainDatasource the working datasource — its target database is dropped |
| 33 | + * and recreated from the rebase template |
| 34 | + * @param rebaseDatasource the rebase datasource — never dropped or modified; |
| 35 | + * used both as the template source and as the connection |
| 36 | + * from which `DROP DATABASE` and `CREATE DATABASE` are |
| 37 | + * issued (these statements cannot be issued from a |
| 38 | + * connection on the target database itself) |
| 39 | + * @param db the database type (must be Postgres) |
| 40 | + */ |
| 41 | +class Rebaser( |
| 42 | + mainDatasource: DataSource, |
| 43 | + rebaseDatasource: DataSource, |
| 44 | + db: SupportedDatabase |
| 45 | +) { |
| 46 | + |
| 47 | + /** Tags a datasource in error messages so users know which one failed. */ |
| 48 | + private enum Role derives CanEqual { |
| 49 | + case Main, Rebase |
| 50 | + |
| 51 | + def label: String = this match { |
| 52 | + case Main => "main" |
| 53 | + case Rebase => "rebase" |
| 54 | + } |
| 55 | + } |
| 56 | + |
| 57 | + private val logger = LoggerFactory.getLogger(classOf[Rebaser]) |
| 58 | + |
| 59 | + /** Drops the target database and recreates it as a fast file-level clone of the |
| 60 | + * rebase database. |
| 61 | + * |
| 62 | + * Sequence: |
| 63 | + * 1. Verify Postgres, parse host:port from each JDBC URL, fail on mismatch. |
| 64 | + * 2. Resolve target and rebase database names via `Connection.getCatalog`. |
| 65 | + * 3. From a single connection on the rebase datasource: |
| 66 | + * - terminate other sessions on the target database, |
| 67 | + * - `DROP DATABASE IF EXISTS target`, |
| 68 | + * - terminate other sessions on the rebase database (required because |
| 69 | + * Postgres rejects `CREATE DATABASE ... TEMPLATE` if any other session |
| 70 | + * is connected to the template source), |
| 71 | + * - `CREATE DATABASE target WITH TEMPLATE rebase`. |
| 72 | + * |
| 73 | + * The issuing session is excluded from the terminate step via |
| 74 | + * `pg_backend_pid()`; Postgres permits the template source to have the |
| 75 | + * issuing session connected during the copy as long as no other sessions |
| 76 | + * are present. |
| 77 | + * |
| 78 | + * @throws java.lang.IllegalArgumentException if `db` is not Postgres, the datasources |
| 79 | + * are on different servers, a URL is |
| 80 | + * multi-host or non-Postgres, or target == |
| 81 | + * rebase |
| 82 | + * @throws java.lang.IllegalStateException if the target database cannot be reached |
| 83 | + * (typically because it does not yet exist) |
| 84 | + */ |
| 85 | + def rebase(): Unit = { |
| 86 | + db match { |
| 87 | + case SupportedDatabase.Postgres => () |
| 88 | + case other => |
| 89 | + throw new IllegalArgumentException( |
| 90 | + s"nomadRebase requires Postgres, got: $other" |
| 91 | + ) |
| 92 | + } |
| 93 | + |
| 94 | + val (mainUrl, targetDbName) = readDatasourceMetadata(mainDatasource, Role.Main) |
| 95 | + val (rebaseUrl, rebaseDbName) = readDatasourceMetadata(rebaseDatasource, Role.Rebase) |
| 96 | + val mainServer = parseSingleHost(mainUrl, Role.Main) |
| 97 | + val rebaseServer = parseSingleHost(rebaseUrl, Role.Rebase) |
| 98 | + |
| 99 | + if (mainServer != rebaseServer) { |
| 100 | + throw new IllegalArgumentException( |
| 101 | + s"Main and rebase datasources must be on the same Postgres server. " + |
| 102 | + s"Main is on '$mainServer', rebase is on '$rebaseServer'." |
| 103 | + ) |
| 104 | + } |
| 105 | + |
| 106 | + if (targetDbName == rebaseDbName) { |
| 107 | + throw new IllegalArgumentException( |
| 108 | + s"Target and rebase databases must differ (both resolved to '$targetDbName'). " + |
| 109 | + s"Refusing to template a database from itself." |
| 110 | + ) |
| 111 | + } |
| 112 | + |
| 113 | + logger.info( |
| 114 | + s"Rebasing target database '$targetDbName' from rebase database '$rebaseDbName' on $mainServer." |
| 115 | + ) |
| 116 | + |
| 117 | + Using.resource(rebaseDatasource.getConnection) { conn => |
| 118 | + // DROP DATABASE and CREATE DATABASE ... WITH TEMPLATE cannot run inside a |
| 119 | + // transaction block. DataSource#getConnection does not guarantee autoCommit |
| 120 | + // is true (pools configured for ORM-style apps often default to false), so |
| 121 | + // force it true for the duration of admin work and restore the borrowed |
| 122 | + // state on the way out — pooled connections returned in a different state |
| 123 | + // than they were borrowed in are a known source of cross-borrow surprises. |
| 124 | + val originalAutoCommit = conn.getAutoCommit |
| 125 | + conn.setAutoCommit(true) |
| 126 | + try |
| 127 | + // ALLOW_CONNECTIONS=false on target closes the gate against the user's |
| 128 | + // app pool refilling connections to target between our terminate and our |
| 129 | + // drop — that race is what would otherwise leave residual sessions and |
| 130 | + // make DROP DATABASE fail with "database 'target' is being accessed by |
| 131 | + // other users". |
| 132 | + // |
| 133 | + // We deliberately do not apply the same trick to the rebase database: |
| 134 | + // 1. Postgres rejects ALTER DATABASE ... ALLOW_CONNECTIONS=false on |
| 135 | + // the session's own current database (which is rebase, because the |
| 136 | + // admin connection is borrowed from rebaseDatasource) with |
| 137 | + // "cannot disallow connections for current database". |
| 138 | + // 2. Conceptually the rebase database is a passive long-lived template |
| 139 | + // that no developer workflow connects to between rebases, so the |
| 140 | + // pool-refill race the gate guards against doesn't realistically |
| 141 | + // arise on the rebase side. We still terminate other sessions on |
| 142 | + // rebase immediately before CREATE ... TEMPLATE so any opportunistic |
| 143 | + // stragglers are cleared, but the rebase side remains technically |
| 144 | + // racy — if your rebase database is being actively connected to |
| 145 | + // (unusual), close those sessions before invoking nomadRebase. |
| 146 | + withAllowConnectionsDisabled(conn, targetDbName) { |
| 147 | + terminateOtherSessions(conn, targetDbName) |
| 148 | + dropDatabaseIfExists(conn, targetDbName) |
| 149 | + terminateOtherSessions(conn, rebaseDbName) |
| 150 | + createDatabaseFromTemplate(conn, targetDbName, rebaseDbName) |
| 151 | + // The new target inherits ALLOW_CONNECTIONS from the rebase template, |
| 152 | + // which we never touched (still true). The surrounding `finally` |
| 153 | + // re-runs ALTER ... ALLOW_CONNECTIONS=true on target — a no-op in the |
| 154 | + // happy path, load-bearing if CREATE failed and the freshly-dropped |
| 155 | + // target was somehow recreated by something else in a non-true state. |
| 156 | + } |
| 157 | + finally |
| 158 | + try conn.setAutoCommit(originalAutoCommit) |
| 159 | + catch { |
| 160 | + case e: java.sql.SQLException => |
| 161 | + logger.warn( |
| 162 | + s"Could not restore autoCommit=$originalAutoCommit on rebase admin connection: ${e.getMessage}" |
| 163 | + ) |
| 164 | + } |
| 165 | + } |
| 166 | + logger.info( |
| 167 | + s"Rebase complete: database '$targetDbName' is now a fresh clone of '$rebaseDbName'." |
| 168 | + ) |
| 169 | + } |
| 170 | + |
| 171 | + /** Returns `(jdbcUrl, currentDatabaseName)` read from a short-lived connection on `ds`. */ |
| 172 | + private def readDatasourceMetadata(ds: DataSource, role: Role): (String, String) = { |
| 173 | + try |
| 174 | + Using.resource(ds.getConnection) { conn => |
| 175 | + val url = conn.getMetaData.getURL |
| 176 | + if (url == null || url.isEmpty) { |
| 177 | + throw new IllegalStateException( |
| 178 | + s"${role.label} datasource did not expose a JDBC URL via getMetaData.getURL." |
| 179 | + ) |
| 180 | + } |
| 181 | + val catalog = conn.getCatalog |
| 182 | + if (catalog == null || catalog.isEmpty) { |
| 183 | + throw new IllegalStateException( |
| 184 | + s"${role.label} datasource did not expose its database name via Connection.getCatalog." |
| 185 | + ) |
| 186 | + } |
| 187 | + (url, catalog) |
| 188 | + } |
| 189 | + catch { |
| 190 | + case e: IllegalStateException => throw e |
| 191 | + case e: java.sql.SQLException => |
| 192 | + val hint = role match { |
| 193 | + case Role.Main => |
| 194 | + "For a rebase, the target database must already exist (Nomad does not create databases)." |
| 195 | + case Role.Rebase => |
| 196 | + "Check that the rebase database exists and the rebaseDatasource configuration is correct." |
| 197 | + } |
| 198 | + throw new IllegalStateException( |
| 199 | + s"Could not connect to ${role.label} datasource. $hint Underlying error: ${e.getMessage}", |
| 200 | + e |
| 201 | + ) |
| 202 | + } |
| 203 | + } |
| 204 | + |
| 205 | + /** Parses a single `host:port` (or bare `host`) from a Postgres JDBC URL. |
| 206 | + * Rejects non-Postgres URLs and multi-host failover URLs. |
| 207 | + */ |
| 208 | + private def parseSingleHost(url: String, role: Role): String = { |
| 209 | + val prefix = "jdbc:postgresql://" |
| 210 | + if (!url.startsWith(prefix)) { |
| 211 | + throw new IllegalArgumentException( |
| 212 | + s"${role.label} datasource is not a Postgres JDBC URL (got: $url). nomadRebase requires Postgres." |
| 213 | + ) |
| 214 | + } |
| 215 | + val afterPrefix = url.substring(prefix.length) |
| 216 | + val authority = afterPrefix.takeWhile(c => c != '/' && c != '?') |
| 217 | + if (authority.contains(',')) { |
| 218 | + throw new IllegalArgumentException( |
| 219 | + s"${role.label} datasource uses a multi-host JDBC URL ($url). nomadRebase requires a single-host URL." |
| 220 | + ) |
| 221 | + } |
| 222 | + if (authority.isEmpty) { |
| 223 | + throw new IllegalArgumentException( |
| 224 | + s"${role.label} datasource URL does not include a host ($url)." |
| 225 | + ) |
| 226 | + } |
| 227 | + authority |
| 228 | + } |
| 229 | + |
| 230 | + /** Runs `body` with `ALLOW_CONNECTIONS=false` set on `dbName`, restoring `true` |
| 231 | + * in a `finally` block. The restore swallows SQL errors (logged at warn) so |
| 232 | + * the body's exception is not masked — e.g., if `body` dropped the database |
| 233 | + * outright and the restore can't find it. The new database created from a |
| 234 | + * template inherits `ALLOW_CONNECTIONS=false` from the template, so a guard |
| 235 | + * scoped to the target name covers the post-CREATE restore as well. |
| 236 | + */ |
| 237 | + private def withAllowConnectionsDisabled[A](conn: Connection, dbName: String)(body: => A): A = { |
| 238 | + setAllowConnections(conn, dbName, allow = false) |
| 239 | + try body |
| 240 | + finally |
| 241 | + try setAllowConnections(conn, dbName, allow = true) |
| 242 | + catch { |
| 243 | + case e: java.sql.SQLException => |
| 244 | + logger.warn( |
| 245 | + s"Could not restore ALLOW_CONNECTIONS=true on database '$dbName' " + |
| 246 | + s"(likely dropped or unreachable): ${e.getMessage}" |
| 247 | + ) |
| 248 | + } |
| 249 | + } |
| 250 | + |
| 251 | + private def setAllowConnections(conn: Connection, dbName: String, allow: Boolean): Unit = { |
| 252 | + Using.resource(conn.createStatement()) { stmt => |
| 253 | + val _ = stmt.execute( |
| 254 | + s"""ALTER DATABASE "${escapeIdentifier(dbName)}" WITH ALLOW_CONNECTIONS ${if (allow) "true" else "false"}""" |
| 255 | + ) |
| 256 | + } |
| 257 | + } |
| 258 | + |
| 259 | + private def terminateOtherSessions(conn: Connection, dbName: String): Unit = { |
| 260 | + Using.resource( |
| 261 | + conn.prepareStatement( |
| 262 | + "SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname = ? AND pid <> pg_backend_pid()" |
| 263 | + ) |
| 264 | + ) { ps => |
| 265 | + ps.setString(1, dbName) |
| 266 | + Using.resource(ps.executeQuery()) { rs => |
| 267 | + // pg_terminate_backend returns false when the PID exited between the |
| 268 | + // SELECT and the signal — count only true returns so the log reflects |
| 269 | + // what was actually killed, and warn separately on the race. |
| 270 | + var succeeded = 0 |
| 271 | + var failed = 0 |
| 272 | + while (rs.next()) { |
| 273 | + if (rs.getBoolean(1)) succeeded += 1 else failed += 1 |
| 274 | + } |
| 275 | + if (succeeded > 0) { |
| 276 | + logger.info(s"Terminated $succeeded other session(s) on database '$dbName'.") |
| 277 | + } |
| 278 | + if (failed > 0) { |
| 279 | + logger.warn( |
| 280 | + s"Failed to terminate $failed session(s) on database '$dbName' " + |
| 281 | + s"(likely already exited between selection and signal)." |
| 282 | + ) |
| 283 | + } |
| 284 | + } |
| 285 | + } |
| 286 | + } |
| 287 | + |
| 288 | + private def dropDatabaseIfExists(conn: Connection, dbName: String): Unit = { |
| 289 | + if (databaseExists(conn, dbName)) { |
| 290 | + Using.resource(conn.createStatement()) { stmt => |
| 291 | + // IF EXISTS retained for the small race window between the pg_database |
| 292 | + // check above and the DROP hitting the server. |
| 293 | + val _ = stmt.execute(s"""DROP DATABASE IF EXISTS "${escapeIdentifier(dbName)}"""") |
| 294 | + } |
| 295 | + logger.info(s"Dropped database '$dbName'.") |
| 296 | + } else { |
| 297 | + logger.info(s"Database '$dbName' did not exist; skipping drop.") |
| 298 | + } |
| 299 | + } |
| 300 | + |
| 301 | + private def databaseExists(conn: Connection, dbName: String): Boolean = { |
| 302 | + Using.resource(conn.prepareStatement("SELECT 1 FROM pg_database WHERE datname = ?")) { ps => |
| 303 | + ps.setString(1, dbName) |
| 304 | + Using.resource(ps.executeQuery())(_.next()) |
| 305 | + } |
| 306 | + } |
| 307 | + |
| 308 | + private def createDatabaseFromTemplate(conn: Connection, target: String, template: String): Unit = { |
| 309 | + Using.resource(conn.createStatement()) { stmt => |
| 310 | + val _ = stmt.execute( |
| 311 | + s"""CREATE DATABASE "${escapeIdentifier(target)}" WITH TEMPLATE "${escapeIdentifier(template)}"""" |
| 312 | + ) |
| 313 | + } |
| 314 | + logger.info(s"Created database '$target' from template '$template'.") |
| 315 | + } |
| 316 | + |
| 317 | + /** Doubles double-quote characters so an identifier can be safely embedded inside `"..."`. */ |
| 318 | + private def escapeIdentifier(id: String): String = id.replace("\"", "\"\"") |
| 319 | +} |
0 commit comments