Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
4140f37
feat: add ATTACH database infrastructure to deterministic simulator
glommer Feb 19, 2026
3c50340
fix: ATTACH database write commit, schema visibility, and read locking
glommer Feb 20, 2026
dd2962a
fix: ALTER TABLE on attached databases
glommer Feb 20, 2026
53ef28e
fix: clean up attached pager state on reprepare
glommer Feb 20, 2026
cb712d4
fix: preserve database prefix in shadow ALTER TABLE RENAME
glommer Feb 20, 2026
2995199
fix: ON CONFLICT schema resolution for attached databases
glommer Feb 20, 2026
40df9a7
fix: reprepare cleanup for attached DB read transactions
glommer Feb 20, 2026
02e0c79
fix: statement savepoints and commit error propagation for attached DBs
glommer Feb 20, 2026
039cd43
fix: handle CommittingAttached state on re-entry in op_auto_commit
glommer Feb 21, 2026
77bee28
fix: clear attached DB schema cache on rollback
glommer Feb 21, 2026
7ec2e2a
fix: IntegrityCk instruction uses wrong pager for attached databases
glommer Feb 21, 2026
d02f230
fix: UPDATE with PK change on attached DB uses wrong database_id for …
glommer Feb 21, 2026
972d925
fix: check DATABASE_MANAGER registry before file lock in open_file_wi…
glommer Feb 21, 2026
8a0ae7f
fix: release attached pager read locks after main DB write commit
glommer Feb 21, 2026
e2487bc
refactor: replace database_id magic numbers with named constants
glommer Feb 21, 2026
8e8ef72
fix: invalidate rusqlite schema cache for attached databases in simul…
glommer Feb 21, 2026
db77252
Merge branch 'main' into attach-fixes-sim
glommer Feb 22, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 68 additions & 27 deletions core/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::util::{OpenMode, OpenOptions};
use crate::Page;
use crate::{
ast, function,
io::{MemoryIO, PlatformIO, IO},
io::{MemoryIO, IO},
parse_schema_rows, refresh_analyze_stats, translate,
util::IOExt,
vdbe, AllViewsTxState, AtomicCipherMode, AtomicSyncMode, AtomicTempStore, BusyHandler,
Expand Down Expand Up @@ -167,6 +167,18 @@ impl Drop for Connection {
}
}

// Also release WAL locks on all attached database pagers
for attached_pager in self.get_all_attached_pagers() {
if let Some(wal) = &attached_pager.wal {
if wal.holds_write_lock() {
wal.end_write_tx();
}
if wal.holds_read_lock() {
wal.end_read_tx();
}
}
}

// if connection wasn't properly closed, decrement the connection counter
self.db
.n_connections
Expand Down Expand Up @@ -1311,25 +1323,26 @@ impl Connection {
database_id: usize,
f: impl FnOnce(&mut Schema) -> T,
) -> T {
if database_id < 2 {
if !crate::is_attached_db(database_id) {
self.with_schema_mut(f)
} else {
let db = {
// For attached databases, update a connection-local copy of the schema.
// We don't update the shared db.schema until after the WAL commit, so
// other connections won't see uncommitted schema changes (which would
// cause SchemaUpdated mismatches).
let mut schemas = self.database_schemas.write();
let schema_arc = schemas.entry(database_id).or_insert_with(|| {
// Lazily copy from the shared Database schema
let attached_dbs = self.attached_databases.read();
let (db, _pager) = attached_dbs
.index_to_data
.get(&database_id)
.expect("Database ID should be valid");
db.clone()
};
let result = {
let mut schema_guard = db.schema.lock();
let schema = Arc::make_mut(&mut *schema_guard);
f(schema)
};
// Invalidate the cache so with_schema() picks up the new version
self.database_schemas.write().remove(&database_id);
result
let schema = db.schema.lock().clone();
schema
});
let schema = Arc::make_mut(schema_arc);
f(schema)
}
}

Expand Down Expand Up @@ -1397,14 +1410,22 @@ impl Connection {
let db_opts = DatabaseOpts::new()
.with_views(use_views)
.with_strict(use_strict);
// Select the IO layer for the attached database:
// - :memory: databases always get a fresh MemoryIO
// - File-based databases reuse the parent's IO when the parent is also
// file-based (important for simulator fault injection and WAL coordination)
// - If the parent is :memory: (MemoryIO) but the attached DB is file-based,
// we need a file-capable IO layer since MemoryIO can't read real files
let io: Arc<dyn IO> = if path.contains(":memory:") {
Arc::new(MemoryIO::new())
} else if self.db.path.starts_with(":memory:") {
Database::io_for_path(path)?
} else {
Arc::new(PlatformIO::new()?)
self.db.io.clone()
};
let main_db_flags = self.db.open_flags;
let db = Self::from_uri_attached(path, db_opts, main_db_flags, io)?;
let pager = Arc::new(db.init_pager(None)?);
let pager = Arc::new(db._init(None)?);
self.attached_databases.write().insert(alias, (db, pager));

Ok(())
Expand Down Expand Up @@ -1460,44 +1481,64 @@ impl Connection {
.collect()
}

/// Get all attached database (index, pager) pairs (excludes main/temp databases)
pub(crate) fn get_all_attached_pagers_with_index(&self) -> Vec<(usize, Arc<Pager>)> {
let catalog = self.attached_databases.read();
catalog
.index_to_data
.iter()
.map(|(&idx, (_db, pager))| (idx, pager.clone()))
.collect()
}

pub(crate) fn database_schemas(&self) -> &RwLock<HashMap<usize, Arc<Schema>>> {
&self.database_schemas
}

/// Publish a connection-local attached DB schema to the shared Database instance.
/// Called after the attached pager's WAL commit succeeds, so other connections
/// can now see the schema changes.
pub(crate) fn publish_attached_schema(&self, database_id: usize) {
let mut schemas = self.database_schemas.write();
if let Some(local_schema) = schemas.remove(&database_id) {
let attached_dbs = self.attached_databases.read();
if let Some((db, _pager)) = attached_dbs.index_to_data.get(&database_id) {
*db.schema.lock() = local_schema;
}
}
}

pub(crate) fn attached_databases(&self) -> &RwLock<DatabaseCatalog> {
&self.attached_databases
}

/// Access schema for a database using a closure pattern to avoid cloning
pub(crate) fn with_schema<T>(&self, database_id: usize, f: impl FnOnce(&Schema) -> T) -> T {
if database_id == 0 {
if database_id == crate::MAIN_DB_ID {
// Main database - use connection's schema which should be kept in sync
let schema = self.schema.read();
f(&schema)
} else if database_id == 1 {
} else if database_id == crate::TEMP_DB_ID {
// Temp database - uses same schema as main for now, but this will change later.
let schema = self.schema.read();
f(&schema)
} else {
// Attached database - check cache first, then load from database
let mut schemas = self.database_schemas.write();

if let Some(cached_schema) = schemas.get(&database_id) {
return f(cached_schema);
// Attached database: prefer the connection-local copy (which may contain
// uncommitted schema changes from this connection's transaction), falling
// back to the shared Database schema (last committed state).
let schemas = self.database_schemas.read();
if let Some(local_schema) = schemas.get(&database_id) {
return f(local_schema);
}
drop(schemas);

// Schema not cached, load it lazily from the attached database
let attached_dbs = self.attached_databases.read();
let (db, _pager) = attached_dbs
.index_to_data
.get(&database_id)
.expect("Database ID should be valid after resolve_database_id");

let schema = db.schema.lock().clone();

// Cache the schema for future use
schemas.insert(database_id, schema.clone());

f(&schema)
}
}
Expand Down
60 changes: 59 additions & 1 deletion core/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,23 @@ pub use vdbe::{
FromValueRow, PrepareContext, PreparedProgram, Program, Register,
};

/// Database index for the main database (always 0 in SQLite).
pub const MAIN_DB_ID: usize = 0;

/// Database index for the temp database (always 1 in SQLite).
pub const TEMP_DB_ID: usize = 1;

/// First database index used for ATTACH-ed databases.
/// SQLite reserves 0 for "main" and 1 for "temp", so attached databases
/// start at index 2.
pub const FIRST_ATTACHED_DB_ID: usize = 2;

/// Returns true if the database index refers to an attached database
/// (i.e. not "main" and not "temp").
pub const fn is_attached_db(database_id: usize) -> bool {
database_id >= FIRST_ATTACHED_DB_ID
}

/// Configuration for database features
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub struct DatabaseOpts {
Expand Down Expand Up @@ -480,6 +497,42 @@ impl Database {
Self::open_file_with_flags(io, path, OpenFlags::default(), DatabaseOpts::new(), None)
}

/// Look up a database in the process-wide registry by path.
/// Returns the cached Database if found, with encryption validation.
/// This avoids opening a file (and acquiring a file lock) when the
/// database is already open in this process.
fn lookup_in_registry(
path: &str,
encryption_opts: &Option<EncryptionOpts>,
) -> Result<Option<Arc<Database>>> {
if path.starts_with(":memory:") {
return Ok(None);
}
let canonical = match std::fs::canonicalize(path)
.ok()
.and_then(|p| p.to_str().map(|s| s.to_string()))
{
Some(c) => c,
None => return Ok(None),
};
let registry = DATABASE_MANAGER.lock_arc();
let db = match registry.get(&canonical).and_then(Weak::upgrade) {
Some(db) => db,
None => return Ok(None),
};

// Validate encryption compatibility (key is not stored for security,
// so we can only check cipher mode)
let db_is_encrypted = !matches!(db.encryption_cipher_mode.get(), CipherMode::None);
if db_is_encrypted && encryption_opts.is_none() {
return Err(LimboError::InvalidArgument(
"Database is encrypted but no encryption options provided".to_string(),
));
}

Ok(Some(db))
}

#[cfg(feature = "fs")]
pub fn open_file_with_flags(
io: Arc<dyn IO>,
Expand All @@ -488,6 +541,11 @@ impl Database {
opts: DatabaseOpts,
encryption_opts: Option<EncryptionOpts>,
) -> Result<Arc<Database>> {
// Check the registry before opening the file to avoid acquiring a file
// lock that would conflict with an already-open Database in this process.
if let Some(db) = Self::lookup_in_registry(path, &encryption_opts)? {
return Ok(db);
}
let file = io.open_file(path, flags, true)?;
let db_file = Arc::new(DatabaseFile::new(file));
Self::open_with_flags(io, path, db_file, flags, opts, encryption_opts)
Expand Down Expand Up @@ -878,7 +936,7 @@ impl Database {

/// Necessary Pager initialization, so that we are prepared to read from Page 1.
/// For encrypted databases, the encryption key must be provided to properly decrypt page 1.
fn _init(&self, encryption_key: Option<&EncryptionKey>) -> Result<Pager> {
pub(crate) fn _init(&self, encryption_key: Option<&EncryptionKey>) -> Result<Pager> {
let pager = self.init_pager(None)?;
pager.enable_encryption(self.opts.enable_encryption);

Expand Down
24 changes: 24 additions & 0 deletions core/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,30 @@ impl Statement {
tracing::trace!("repreparing statement");
let conn = self.program.connection.clone();

// End transactions on attached database pagers so they get a fresh view
// of the database. Without this, the pager would still see the old page 1
// with the stale schema cookie, causing an infinite SchemaUpdated loop.
// SchemaUpdated can occur at different points in the Transaction opcode,
// so the attached pager may or may not hold locks at this point.
let attached_db_ids: Vec<usize> = self
.program
.prepared
.write_databases
.iter()
.chain(self.program.prepared.read_databases.iter())
.filter(|&&id| id >= 2)
.copied()
.collect();
for db_id in attached_db_ids {
let pager = conn.get_pager_from_database_index(&db_id);
// Discard any connection-local schema changes for this attached DB
// so the re-translate reads the committed schema.
conn.database_schemas().write().remove(&db_id);
if pager.holds_read_lock() {
pager.rollback_attached();
}
}

*conn.schema.write() = conn.db.clone_schema();
self.program = {
let mut parser = Parser::new(self.program.sql.as_bytes());
Expand Down
7 changes: 7 additions & 0 deletions core/storage/pager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2694,6 +2694,13 @@ impl Pager {
wal.holds_read_lock()
}

pub fn holds_write_lock(&self) -> bool {
let Some(wal) = self.wal.as_ref() else {
return false;
};
wal.holds_write_lock()
}

/// Rollback and clean up an attached database pager's transaction.
/// Unlike rollback_tx, this doesn't modify connection-level state.
pub fn rollback_attached(&self) {
Expand Down
1 change: 0 additions & 1 deletion core/storage/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1267,7 +1267,6 @@ impl Wal for WalFile {
/// End a write transaction
#[instrument(skip_all, level = Level::DEBUG)]
fn end_write_tx(&self) {
tracing::debug!("end_write_txn");
turso_assert!(
self.write_lock_held
.compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
Expand Down
22 changes: 13 additions & 9 deletions core/translate/alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ fn emit_add_column_default_type_validation(
program.emit_insn(Insn::OpenRead {
cursor_id: check_cursor_id,
root_page: original_btree.root_page,
db: 0,
db: crate::MAIN_DB_ID,
});

let skip_check_label = program.allocate_label();
Expand Down Expand Up @@ -358,12 +358,18 @@ pub fn translate_alter_table(
body: alter_table,
} = alter;
let database_id = resolver.resolve_database_id(&qualified_name)?;
if database_id >= 2 {
if crate::is_attached_db(database_id) {
let schema_cookie = resolver.with_schema(database_id, |s| s.schema_version);
program.begin_write_on_database(database_id, schema_cookie);
}
program.begin_write_operation();
let table_name = qualified_name.name.as_str();
// For attached databases, qualify sqlite_schema with the database name
// so that the UPDATE targets the correct database's schema table.
let qualified_schema_table = match &qualified_name.db_name {
Some(db_name) => format!("{}.{}", db_name.as_str(), SQLITE_TABLEID),
None => SQLITE_TABLEID.to_string(),
};
let schema_version = resolver.with_schema(database_id, |s| s.schema_version);
validate(&alter_table, table_name)?;

Expand Down Expand Up @@ -573,7 +579,7 @@ pub fn translate_alter_table(

let stmt = format!(
r#"
UPDATE {SQLITE_TABLEID}
UPDATE {qualified_schema_table}
SET sql = '{sql}'
WHERE name = '{table_name}' COLLATE NOCASE AND type = 'table'
"#,
Expand Down Expand Up @@ -823,7 +829,7 @@ pub fn translate_alter_table(

let stmt = format!(
r#"
UPDATE {SQLITE_TABLEID}
UPDATE {qualified_schema_table}
SET sql = '{escaped}'
WHERE name = '{table_name}' COLLATE NOCASE AND type = 'table'
"#,
Expand Down Expand Up @@ -954,8 +960,7 @@ pub fn translate_alter_table(
};

let sqlite_schema = resolver
.schema()
.get_btree_table(SQLITE_TABLEID)
.with_schema(database_id, |s| s.get_btree_table(SQLITE_TABLEID))
.ok_or_else(|| {
LimboError::ParseError("sqlite_schema table not found in schema".to_string())
})?;
Expand Down Expand Up @@ -1232,8 +1237,7 @@ pub fn translate_alter_table(
}

let sqlite_schema = resolver
.schema()
.get_btree_table(SQLITE_TABLEID)
.with_schema(database_id, |s| s.get_btree_table(SQLITE_TABLEID))
.ok_or_else(|| {
LimboError::ParseError("sqlite_schema table not found in schema".to_string())
})?;
Expand Down Expand Up @@ -1315,7 +1319,7 @@ pub fn translate_alter_table(
let escaped_sql = new_sql.replace('\'', "''");
let update_stmt = format!(
r#"
UPDATE {SQLITE_TABLEID}
UPDATE {qualified_schema_table}
SET sql = '{escaped_sql}'
WHERE name = '{trigger_name}' COLLATE NOCASE AND type = 'trigger'
"#,
Expand Down
2 changes: 1 addition & 1 deletion core/translate/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub fn translate_delete(
crate::bail_parse_error!("table {} may not be modified", tbl_name);
}

if database_id >= 2 {
if crate::is_attached_db(database_id) {
let schema_cookie = resolver.with_schema(database_id, |s| s.schema_version);
program.begin_write_on_database(database_id, schema_cookie);
}
Expand Down
Loading
Loading