Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ The codebase follows a layered architecture with clear separation of concerns:
- Also provides individual lookup methods: `get_schema_by_name()`, `get_table_by_name()`, and `table_exists()`
- `DuckdbMetadataProvider` implements the trait using DuckDB as the catalog backend
- Executes SQL queries against standard DuckLake catalog tables (`ducklake_snapshot`, `ducklake_schema`, `ducklake_table`, `ducklake_column`, `ducklake_data_file`, `ducklake_delete_file`, `ducklake_metadata`)
- Thread-safe: Opens a new read-only connection for each query
- Thread-safe: Uses a single shared connection protected by Mutex for efficiency
- Supports delete files: `get_table_files_for_select()` returns data files with associated delete files

2. **DataFusion Integration Layer** (`src/catalog.rs`, `src/schema.rs`, `src/table.rs`)
Expand Down
53 changes: 29 additions & 24 deletions src/metadata_provider_duckdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,35 +10,40 @@ use crate::metadata_provider::{
};
use duckdb::AccessMode::ReadOnly;
use duckdb::{Config, Connection, params};
use std::sync::{Arc, Mutex, MutexGuard};

/// DuckDB metadata provider
///
/// Opens a new connection for each query to avoid thread-safety issues.
/// This is acceptable for read-only operations.
/// Uses a single shared connection protected by a Mutex to avoid
/// the overhead of creating a new connection for each metadata query.
/// This is safe for read-only operations.
#[derive(Debug, Clone)]
pub struct DuckdbMetadataProvider {
conn: Arc<Mutex<Connection>>,
/// Path to the catalog database, retained for logging/debugging
#[allow(dead_code)]
catalog_path: String,
}

impl DuckdbMetadataProvider {
/// Create a new DuckDB metadata provider
pub fn new(catalog_path: impl Into<String>) -> crate::Result<Self> {
let catalog_path = catalog_path.into();

// Validate connection works
let _conn = DuckdbMetadataProvider::open_connection_with_path(&catalog_path)?;
let conn = Self::create_connection(&catalog_path)?;

Ok(Self {
conn: Arc::new(Mutex::new(conn)),
catalog_path,
})
}

fn open_connection(&self) -> crate::Result<Connection> {
DuckdbMetadataProvider::open_connection_with_path(&self.catalog_path)
/// Get a reference to the shared connection
fn connection(&self) -> MutexGuard<'_, Connection> {
self.conn.lock().expect("DuckDB connection mutex poisoned")
}

/// Open a connection to the catalog database
fn open_connection_with_path(catalog_path: &str) -> crate::Result<Connection> {
/// Create a new read-only connection to the catalog database
fn create_connection(catalog_path: &str) -> crate::Result<Connection> {
let config = Config::default().access_mode(ReadOnly)?;
match Connection::open_with_flags(catalog_path, config) {
Ok(con) => Ok(con),
Expand All @@ -63,19 +68,19 @@ impl DuckdbMetadataProvider {

impl MetadataProvider for DuckdbMetadataProvider {
fn get_current_snapshot(&self) -> crate::Result<i64> {
let conn = self.open_connection()?;
let conn = self.connection();
let snapshot_id: i64 = conn.query_row(SQL_GET_LATEST_SNAPSHOT, [], |row| row.get(0))?;
Ok(snapshot_id)
}

fn get_data_path(&self) -> crate::Result<String> {
let conn = self.open_connection()?;
let conn = self.connection();
let data_path: String = conn.query_row(SQL_GET_DATA_PATH, [], |row| row.get(0))?;
Ok(data_path)
}

fn list_snapshots(&self) -> crate::Result<Vec<SnapshotMetadata>> {
let conn = self.open_connection()?;
let conn = self.connection();
let mut stmt = conn.prepare(SQL_LIST_SNAPSHOTS)?;

let snapshots = stmt
Expand All @@ -93,7 +98,7 @@ impl MetadataProvider for DuckdbMetadataProvider {
}

fn list_schemas(&self, snapshot_id: i64) -> crate::Result<Vec<SchemaMetadata>> {
let conn = self.open_connection()?;
let conn = self.connection();
let mut stmt = conn.prepare(SQL_LIST_SCHEMAS)?;

let schemas = stmt
Expand All @@ -115,7 +120,7 @@ impl MetadataProvider for DuckdbMetadataProvider {
}

fn list_tables(&self, schema_id: i64, snapshot_id: i64) -> crate::Result<Vec<TableMetadata>> {
let conn = self.open_connection()?;
let conn = self.connection();
let mut stmt = conn.prepare(SQL_LIST_TABLES)?;

let tables = stmt
Expand All @@ -137,7 +142,7 @@ impl MetadataProvider for DuckdbMetadataProvider {
}

fn get_table_structure(&self, table_id: i64) -> crate::Result<Vec<DuckLakeTableColumn>> {
let conn = self.open_connection()?;
let conn = self.connection();
let mut stmt = conn.prepare(SQL_GET_TABLE_COLUMNS)?;

let columns = stmt
Expand All @@ -163,7 +168,7 @@ impl MetadataProvider for DuckdbMetadataProvider {
table_id: i64,
snapshot_id: i64,
) -> crate::Result<Vec<DuckLakeTableFile>> {
let conn = self.open_connection()?;
let conn = self.connection();
let mut stmt = conn.prepare(SQL_GET_DATA_FILES)?;

let files = stmt
Expand Down Expand Up @@ -214,7 +219,7 @@ impl MetadataProvider for DuckdbMetadataProvider {
name: &str,
snapshot_id: i64,
) -> crate::Result<Option<SchemaMetadata>> {
let conn = self.open_connection()?;
let conn = self.connection();
let mut stmt = conn.prepare(SQL_GET_SCHEMA_BY_NAME)?;

let mut rows = stmt.query(params![name, snapshot_id, snapshot_id])?;
Expand All @@ -241,7 +246,7 @@ impl MetadataProvider for DuckdbMetadataProvider {
name: &str,
snapshot_id: i64,
) -> crate::Result<Option<TableMetadata>> {
let conn = self.open_connection()?;
let conn = self.connection();
let mut stmt = conn.prepare(SQL_GET_TABLE_BY_NAME)?;

let mut rows = stmt.query(params![&schema_id, &name, &snapshot_id, &snapshot_id])?;
Expand All @@ -263,7 +268,7 @@ impl MetadataProvider for DuckdbMetadataProvider {
}

fn table_exists(&self, schema_id: i64, name: &str, snapshot_id: i64) -> crate::Result<bool> {
let conn = self.open_connection()?;
let conn = self.connection();
let exists: bool = conn.query_row(
SQL_TABLE_EXISTS,
params![schema_id, &name, &snapshot_id, &snapshot_id],
Expand All @@ -273,7 +278,7 @@ impl MetadataProvider for DuckdbMetadataProvider {
}

fn list_all_tables(&self, snapshot_id: i64) -> crate::Result<Vec<TableWithSchema>> {
let conn = self.open_connection()?;
let conn = self.connection();
let mut stmt = conn.prepare(SQL_LIST_ALL_TABLES)?;

let tables = stmt
Expand All @@ -299,7 +304,7 @@ impl MetadataProvider for DuckdbMetadataProvider {
}

fn list_all_columns(&self, snapshot_id: i64) -> crate::Result<Vec<ColumnWithTable>> {
let conn = self.open_connection()?;
let conn = self.connection();
let mut stmt = conn.prepare(SQL_LIST_ALL_COLUMNS)?;

let columns = stmt
Expand Down Expand Up @@ -328,7 +333,7 @@ impl MetadataProvider for DuckdbMetadataProvider {
}

fn list_all_files(&self, snapshot_id: i64) -> crate::Result<Vec<FileWithTable>> {
let conn = self.open_connection()?;
let conn = self.connection();
let mut stmt = conn.prepare(SQL_LIST_ALL_FILES)?;

let files = stmt
Expand Down Expand Up @@ -396,7 +401,7 @@ impl MetadataProvider for DuckdbMetadataProvider {
start_snapshot: i64,
end_snapshot: i64,
) -> crate::Result<Vec<DataFileChange>> {
let conn = self.open_connection()?;
let conn = self.connection();
let mut stmt = conn.prepare(SQL_GET_DATA_FILES_ADDED_BETWEEN_SNAPSHOTS)?;

let files = stmt
Expand All @@ -421,7 +426,7 @@ impl MetadataProvider for DuckdbMetadataProvider {
start_snapshot: i64,
end_snapshot: i64,
) -> crate::Result<Vec<DeleteFileChange>> {
let conn = self.open_connection()?;
let conn = self.connection();
let mut stmt = conn.prepare(SQL_GET_DELETE_FILES_ADDED_BETWEEN_SNAPSHOTS)?;

let files = stmt
Expand Down
2 changes: 1 addition & 1 deletion tests/concurrent_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
//! ## Thread Safety Guarantees
//!
//! The DuckLake implementation is designed to be thread-safe:
//! - **MetadataProvider**: Opens a new read-only DuckDB connection per query
//! - **MetadataProvider**: Uses a single shared connection protected by Mutex
//! - **Catalog/Schema**: Dynamic metadata lookup with no shared mutable state
//! - **Table**: Immutable metadata cached at creation time
//! - **ObjectStore**: DataFusion's object stores are Arc<dyn ObjectStore> (thread-safe)
Expand Down
Loading