Skip to content

Commit 0c51676

Browse files
committed
Make DuckDB database attachments logic more robust
1 parent c371d52 commit 0c51676

1 file changed

Lines changed: 21 additions & 9 deletions

File tree

src/sql/db_connection_pool/dbconnection/duckdbconn.rs

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -123,11 +123,29 @@ impl DuckDBAttachments {
123123
}
124124

125125
/// Attaches the databases to the given connection and sets the search path for the newly attached databases.
126+
/// If connection already contains attachments, it will skip the attachments override (including search_path).
126127
///
127128
/// # Errors
128129
///
129130
/// Returns an error if a specific attachment is missing, cannot be attached, search path cannot be set or the connection fails.
130131
pub fn attach(&self, conn: &Connection) -> Result<()> {
132+
133+
// Check if attachments already exist, skip it in this case
134+
let mut stmt = conn
135+
.prepare("PRAGMA database_list;")
136+
.context(DuckDBConnectionSnafu)?;
137+
let mut rows = stmt.query([]).context(DuckDBConnectionSnafu)?;
138+
139+
while let Some(row) = rows.next()? {
140+
let ds_name: String = row.get(1)?;
141+
if ds_name.starts_with("attachment_") {
142+
tracing::debug!(
143+
"Connection contains existing attachments, skipping attachments override",
144+
);
145+
return Ok(());
146+
}
147+
}
148+
131149
for (i, db) in self.attachments.iter().enumerate() {
132150
// check the db file exists
133151
std::fs::metadata(db.as_ref()).context(UnableToAttachDatabaseSnafu {
@@ -154,7 +172,7 @@ impl DuckDBAttachments {
154172
pub fn detach(&self, conn: &Connection) -> Result<()> {
155173
for (i, _) in self.attachments.iter().enumerate() {
156174
conn.execute(
157-
&format!("DETACH {}", Self::get_attachment_name(&self.random_id, i)),
175+
&format!("DETACH DATABASE IF EXISTS {}", Self::get_attachment_name(&self.random_id, i)),
158176
[],
159177
)
160178
.context(DuckDBConnectionSnafu)?;
@@ -320,7 +338,9 @@ impl SyncDbConnection<r2d2::PooledConnection<DuckdbConnectionManager>, DuckDBPar
320338
) -> Result<SendableRecordBatchStream> {
321339
let (batch_tx, mut batch_rx) = tokio::sync::mpsc::channel::<RecordBatch>(4);
322340

341+
let conn = self.conn.try_clone()?;
323342
Self::attach(&self.conn, &self.attachments)?;
343+
324344
let fetch_schema_sql =
325345
format!("WITH fetch_schema AS ({sql}) SELECT * FROM fetch_schema LIMIT 0");
326346
let mut stmt = self
@@ -334,21 +354,15 @@ impl SyncDbConnection<r2d2::PooledConnection<DuckdbConnectionManager>, DuckDBPar
334354
.boxed()
335355
.context(super::UnableToGetSchemaSnafu)?;
336356

337-
Self::detach(&self.conn, &self.attachments)?;
338-
339357
let schema = result.get_schema();
340358

341359
let params = params.iter().map(dyn_clone::clone).collect::<Vec<_>>();
342360

343-
let conn = self.conn.try_clone()?; // try_clone creates a new connection to the same database
344-
// this creates a new connection session, requiring resetting the ATTACHments and search_path
345361
let sql = sql.to_string();
346362

347363
let cloned_schema = schema.clone();
348-
let attachments = self.attachments.clone();
349364

350365
let join_handle = tokio::task::spawn_blocking(move || {
351-
Self::attach(&conn, &attachments)?; // this attach could happen when we clone the connection, but we can't detach after the thread closes because the connection isn't thread safe
352366
let mut stmt = conn.prepare(&sql).context(DuckDBQuerySnafu)?;
353367
let params: &[&dyn ToSql] = &params
354368
.iter()
@@ -360,8 +374,6 @@ impl SyncDbConnection<r2d2::PooledConnection<DuckdbConnectionManager>, DuckDBPar
360374
for i in result {
361375
blocking_channel_send(&batch_tx, i)?;
362376
}
363-
364-
Self::detach(&conn, &attachments)?;
365377
Ok::<_, Box<dyn std::error::Error + Send + Sync>>(())
366378
});
367379

0 commit comments

Comments
 (0)