Skip to content

Commit 8b2d747

Browse files
authored
fix: make DuckDB attachments logic more robust (#509)
1 parent 373b121 commit 8b2d747

5 files changed

Lines changed: 99 additions & 37 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ native-tls = { version = "0.2.11", optional = true }
6161
num-bigint = { workspace = true }
6262
num-traits = { version = "0.2", optional = true }
6363
odbc-api = { version = "19.0", optional = true }
64+
once_cell = "1.21"
6465
pem = { version = "3.0.4", optional = true }
6566
postgres-native-tls = { version = "0.5.0", optional = true }
6667
prost = { workspace = true, optional = true }

core/src/duckdb.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -435,7 +435,10 @@ impl TableProviderFactory for DuckDBTableProviderFactory {
435435
Mode::File => {
436436
let read_pool = pool.clone();
437437

438-
read_pool.set_attached_databases(&self.attach_databases(&options))
438+
read_pool
439+
.set_attached_databases(&self.attach_databases(&options))
440+
.context(DbConnectionPoolSnafu)
441+
.map_err(to_datafusion_error)?
439442
}
440443
Mode::Memory => pool.clone(),
441444
};

core/src/sql/db_connection_pool/dbconnection/duckdbconn.rs

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use duckdb::ToSql;
1818
use duckdb::{Connection, DuckdbConnectionManager};
1919
use dyn_clone::DynClone;
2020
use rand::distr::{Alphanumeric, SampleString};
21+
use once_cell::sync::OnceCell;
2122
use snafu::{prelude::*, ResultExt};
2223
use tokio::sync::mpsc::Sender;
2324

@@ -63,11 +64,15 @@ impl<T: ToSql + Sync + Send + DynClone> DuckDBSyncParameter for T {
6364
dyn_clone::clone_trait_object!(DuckDBSyncParameter);
6465
pub type DuckDBParameter = Box<dyn DuckDBSyncParameter>;
6566

67+
/// Configuration and state for attaching external DuckDB databases.
6668
#[derive(Debug)]
6769
pub struct DuckDBAttachments {
6870
attachments: HashSet<Arc<str>>,
6971
random_id: String,
7072
main_db: String,
73+
/// Cached search_path after first successful attachments initialization.
74+
/// Uses OnceCell to ensure attachment happens exactly once.
75+
search_path_cache: OnceCell<Arc<str>>,
7176
}
7277

7378
impl DuckDBAttachments {
@@ -80,9 +85,16 @@ impl DuckDBAttachments {
8085
attachments,
8186
random_id,
8287
main_db: main_db.to_string(),
88+
search_path_cache: OnceCell::new(),
8389
}
8490
}
8591

92+
/// Returns a reference to the set of database paths to attach.
93+
#[must_use]
94+
pub fn attachments(&self) -> &HashSet<Arc<str>> {
95+
&self.attachments
96+
}
97+
8698
/// Returns the search path for the given database and attachments.
8799
/// The given database needs to be included separately, as search path by default do not include the main database.
88100
/// The `attachments` parameter represents full attachment names, e.g., ["attachment_zCVN0zYJ_0", ...]
@@ -213,6 +225,23 @@ impl DuckDBAttachments {
213225
fn get_attachment_name(random_id: &str, index: usize) -> String {
214226
format!("attachment_{random_id}_{index}")
215227
}
228+
229+
/// Lazily attaches databases on first call, then applies search_path to the connection.
230+
/// Uses cached search_path on subsequent calls.
231+
///
232+
/// # Errors
233+
///
234+
/// Returns an error if attachment or search_path setting fails.
235+
pub fn attach_once(&self, conn: &Connection) -> Result<()> {
236+
let search_path = self.search_path_cache.get_or_try_init(|| {
237+
self.attach(conn)
238+
})?;
239+
240+
conn.execute(&format!("SET search_path = '{}'", search_path), [])
241+
.context(DuckDBConnectionSnafu)?;
242+
243+
Ok(())
244+
}
216245
}
217246

218247
pub struct DuckDbConnection {
@@ -266,7 +295,7 @@ impl DuckDbConnection {
266295
) -> &mut r2d2::PooledConnection<DuckdbConnectionManager> {
267296
&mut self.conn
268297
}
269-
298+
270299
#[must_use]
271300
pub fn with_unsupported_type_action(
272301
mut self,
@@ -429,7 +458,11 @@ impl SyncDbConnection<r2d2::PooledConnection<DuckdbConnectionManager>, DuckDBPar
429458
let (batch_tx, mut batch_rx) = tokio::sync::mpsc::channel::<RecordBatch>(4);
430459

431460
let conn = self.conn.try_clone()?;
432-
Self::attach(&conn, &self.attachments)?;
461+
462+
if let Some(attachments) = &self.attachments {
463+
attachments.attach_once(&conn)?;
464+
}
465+
433466
self.apply_connection_setup_queries(&conn)?;
434467

435468
let fetch_schema_sql =

core/src/sql/db_connection_pool/duckdbpool.rs

Lines changed: 58 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use async_trait::async_trait;
22
use duckdb::{vtab::arrow::ArrowVTab, AccessMode, DuckdbConnectionManager};
3+
use once_cell::sync::OnceCell;
34
use snafu::{prelude::*, ResultExt};
45
use std::sync::Arc;
56

@@ -119,7 +120,7 @@ impl DuckDbConnectionPoolBuilder {
119120
path: ":memory:".into(),
120121
pool,
121122
join_push_down: JoinPushDown::AllowedFor(":memory:".to_string()),
122-
attached_databases: Vec::new(),
123+
attached_databases: Arc::new(OnceCell::new()),
123124
mode: Mode::Memory,
124125
unsupported_type_action: UnsupportedTypeAction::Error,
125126
connection_setup_queries: self.connection_setup_queries,
@@ -160,7 +161,7 @@ impl DuckDbConnectionPoolBuilder {
160161
pool,
161162
// Allow join-push down for any other instances that connect to the same underlying file.
162163
join_push_down: JoinPushDown::AllowedFor(self.path),
163-
attached_databases: Vec::new(),
164+
attached_databases: Arc::new(OnceCell::new()),
164165
mode: Mode::File,
165166
unsupported_type_action: UnsupportedTypeAction::Error,
166167
connection_setup_queries: self.connection_setup_queries,
@@ -180,7 +181,8 @@ pub struct DuckDbConnectionPool {
180181
path: Arc<str>,
181182
pool: Arc<r2d2::Pool<DuckdbConnectionManager>>,
182183
join_push_down: JoinPushDown,
183-
attached_databases: Vec<Arc<str>>,
184+
/// Shared across clones. Initialized once, first set of attached databases wins.
185+
attached_databases: Arc<OnceCell<Arc<DuckDBAttachments>>>,
184186
mode: Mode,
185187
unsupported_type_action: UnsupportedTypeAction,
186188
connection_setup_queries: Vec<Arc<str>>,
@@ -191,7 +193,7 @@ impl std::fmt::Debug for DuckDbConnectionPool {
191193
f.debug_struct("DuckDbConnectionPool")
192194
.field("path", &self.path)
193195
.field("join_push_down", &self.join_push_down)
194-
.field("attached_databases", &self.attached_databases)
196+
.field("attached_databases", &self.attached_databases.get())
195197
.field("mode", &self.mode)
196198
.field("unsupported_type_action", &self.unsupported_type_action)
197199
.finish()
@@ -254,19 +256,58 @@ impl DuckDbConnectionPool {
254256
self
255257
}
256258

257-
#[must_use]
258-
pub fn set_attached_databases(mut self, databases: &[Arc<str>]) -> Self {
259-
self.attached_databases = databases.to_vec();
260-
259+
/// Sets the databases to attach for cross-database queries.
260+
/// Attachments are performed lazily on first query using `OnceCell`.
261+
///
262+
/// If attachments are already configured with the same databases, this is a no-op.
263+
/// If attachments are already configured with different databases, a warning is logged
264+
/// and the existing attachments are preserved.
265+
///
266+
/// # Errors
267+
///
268+
/// Returns an error if unable to extract database name from path.
269+
pub fn set_attached_databases(mut self, databases: &[Arc<str>]) -> Result<Self> {
261270
if !databases.is_empty() {
262-
let mut paths = self.attached_databases.clone();
271+
let mut paths: Vec<Arc<str>> = databases.to_vec();
263272
paths.push(Arc::clone(&self.path));
264273
paths.sort();
265-
let push_down_context = paths.join(";");
274+
let push_down_context = paths
275+
.iter()
276+
.map(|p| p.as_ref())
277+
.collect::<Vec<_>>()
278+
.join(";");
266279
self.join_push_down = JoinPushDown::AllowedFor(push_down_context);
267280
}
268281

269-
self
282+
let db_name = extract_db_name(Arc::clone(&self.path))?;
283+
let new_set: std::collections::HashSet<Arc<str>> = databases.iter().cloned().collect();
284+
let path = Arc::clone(&self.path);
285+
286+
let existing = self.attached_databases.get_or_init(|| {
287+
tracing::debug!(
288+
"pool_path = {}, db_name = {}, databases = {:?}, set_attached_databases: creating new DuckDBAttachments",
289+
path, db_name, databases
290+
);
291+
Arc::new(DuckDBAttachments::new(&db_name, databases))
292+
});
293+
294+
// Check if the existing attachments match what was requested
295+
let existing_set = existing.attachments();
296+
if *existing_set != new_set {
297+
tracing::warn!(
298+
"Unable to reconfigure DuckDB attachments for database {}: attachments are already configured with a different set of databases. \
299+
Existing: {existing_set:?}, Requested: {new_set:?}. Keeping existing attachments.",
300+
self.path
301+
);
302+
}
303+
304+
Ok(self)
305+
}
306+
307+
/// Returns the attachments configuration.
308+
#[must_use]
309+
pub fn get_attachments(&self) -> Option<&Arc<DuckDBAttachments>> {
310+
self.attached_databases.get()
270311
}
271312

272313
#[must_use]
@@ -289,16 +330,14 @@ impl DuckDbConnectionPool {
289330
let conn: r2d2::PooledConnection<DuckdbConnectionManager> =
290331
pool.get().context(ConnectionPoolSnafu)?;
291332

292-
let attachments = self.get_attachments()?;
293-
294333
for query in self.connection_setup_queries.iter() {
295334
tracing::debug!("DuckDB connection setup: {}", query);
296335
conn.execute(query, []).context(DuckDBConnectionSnafu)?;
297336
}
298337

299338
Ok(Box::new(
300339
DuckDbConnection::new(conn)
301-
.with_attachments(attachments)
340+
.with_attachments(self.attached_databases.get().cloned())
302341
.with_connection_setup_queries(self.connection_setup_queries.clone())
303342
.with_unsupported_type_action(self.unsupported_type_action),
304343
))
@@ -308,21 +347,6 @@ impl DuckDbConnectionPool {
308347
pub fn mode(&self) -> Mode {
309348
self.mode
310349
}
311-
312-
pub fn get_attachments(&self) -> Result<Option<Arc<DuckDBAttachments>>> {
313-
if self.attached_databases.is_empty() {
314-
Ok(None)
315-
} else {
316-
#[cfg(not(feature = "duckdb-federation"))]
317-
return Ok(None);
318-
319-
#[cfg(feature = "duckdb-federation")]
320-
Ok(Some(Arc::new(DuckDBAttachments::new(
321-
&extract_db_name(Arc::clone(&self.path))?,
322-
&self.attached_databases,
323-
))))
324-
}
325-
}
326350
}
327351

328352
#[async_trait]
@@ -338,16 +362,14 @@ impl DbConnectionPool<r2d2::PooledConnection<DuckdbConnectionManager>, DuckDBPar
338362
let conn: r2d2::PooledConnection<DuckdbConnectionManager> =
339363
pool.get().context(ConnectionPoolSnafu)?;
340364

341-
let attachments = self.get_attachments()?;
342-
343365
for query in self.connection_setup_queries.iter() {
344366
tracing::debug!("DuckDB connection setup: {}", query);
345367
conn.execute(query, []).context(DuckDBConnectionSnafu)?;
346368
}
347369

348370
Ok(Box::new(
349371
DuckDbConnection::new(conn)
350-
.with_attachments(attachments)
372+
.with_attachments(self.attached_databases.get().cloned())
351373
.with_connection_setup_queries(self.connection_setup_queries.clone())
352374
.with_unsupported_type_action(self.unsupported_type_action),
353375
))
@@ -438,12 +460,14 @@ mod test {
438460
let db_attached_name = random_db_name();
439461
let pool = DuckDbConnectionPool::new_file(&db_base_name, &AccessMode::ReadWrite)
440462
.expect("DuckDB connection pool to be created")
441-
.set_attached_databases(&[Arc::from(db_attached_name.as_str())]);
463+
.set_attached_databases(&[Arc::from(db_attached_name.as_str())])
464+
.expect("Attached databases should be set");
442465

443466
let pool_attached =
444467
DuckDbConnectionPool::new_file(&db_attached_name, &AccessMode::ReadWrite)
445468
.expect("DuckDB connection pool to be created")
446-
.set_attached_databases(&[Arc::from(db_base_name.as_str())]);
469+
.set_attached_databases(&[Arc::from(db_base_name.as_str())])
470+
.expect("Attached databases should be set");
447471

448472
let conn = pool
449473
.pool

0 commit comments

Comments
 (0)