diff --git a/examples/basic_query.rs b/examples/basic_query.rs index e70cf60..5fe5be4 100644 --- a/examples/basic_query.rs +++ b/examples/basic_query.rs @@ -1,19 +1,30 @@ -//! Basic DuckLake query example +//! Basic DuckLake query example with snapshot isolation //! //! This example demonstrates how to: //! 1. Create a DuckLake catalog from a DuckDB catalog file -//! 2. Register it with DataFusion -//! 3. Execute a simple SELECT query +//! 2. Bind the catalog to a specific snapshot for query consistency +//! 3. Register it with DataFusion +//! 4. Execute a simple SELECT query +//! +//! ## Snapshot Isolation +//! +//! Each DuckLake catalog is bound to a specific snapshot ID at creation time. +//! This guarantees that all queries through that catalog see a consistent view +//! of the data, even if multiple schema/table lookups happen during query planning +//! or if the underlying data changes. +//! +//! To query data at different points in time, create separate catalogs bound to +//! different snapshot IDs. //! //! To run this example, you need: //! - A DuckDB database file with DuckLake tables //! - Parquet data files referenced by the catalog //! -//! Usage: cargo run --example basic_query +//! Usage: cargo run --example basic_query use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::prelude::*; -use datafusion_ducklake::{DuckLakeCatalog, DuckdbMetadataProvider}; +use datafusion_ducklake::{DuckLakeCatalog, DuckdbMetadataProvider, MetadataProvider}; use object_store::ObjectStore; use object_store::aws::AmazonS3Builder; use std::env; @@ -37,7 +48,12 @@ async fn main() -> Result<(), Box> { println!("Connecting to DuckLake catalog: {}", catalog_path); // Create the metadata provider - let provider = DuckdbMetadataProvider::new(catalog_path)?; + let provider = Arc::new(DuckdbMetadataProvider::new(catalog_path)?); + + // Get the current snapshot ID + // This ensures query consistency - all metadata lookups will use this snapshot + let snapshot_id = provider.get_current_snapshot()?; + println!("Current snapshot ID: {}", snapshot_id); // Create runtime and register object stores // For MinIO or S3, register the object store with the runtime @@ -56,8 +72,14 @@ async fn main() -> Result<(), Box> { ); runtime.register_object_store(&Url::parse("s3://ducklake-data/")?, s3); - // Create the DuckLake catalog - let ducklake_catalog = DuckLakeCatalog::new(provider)?; + // Create the DuckLake catalog bound to the snapshot + // This ensures all queries through this catalog see consistent data + // from this specific snapshot, even if the underlying data changes + let ducklake_catalog = DuckLakeCatalog::with_snapshot(provider, snapshot_id)?; + + // Alternative: Use the backward-compatible constructor that automatically + // binds to the current snapshot: + // let ducklake_catalog = DuckLakeCatalog::new(DuckdbMetadataProvider::new(catalog_path)?)?; println!("✓ Connected to DuckLake catalog"); diff --git a/src/catalog.rs b/src/catalog.rs index 3d3fbad..4d7845c 100644 --- a/src/catalog.rs +++ b/src/catalog.rs @@ -14,10 +14,13 @@ use datafusion::datasource::object_store::ObjectStoreUrl; /// /// Connects to a DuckLake catalog database and provides access to schemas and tables. /// Uses dynamic metadata lookup - schemas are queried on-demand from the catalog database. +/// Bound to a specific snapshot ID for query consistency. #[derive(Debug)] pub struct DuckLakeCatalog { /// Metadata provider for querying catalog provider: Arc, + /// Snapshot ID this catalog is bound to (for query consistency) + snapshot_id: i64, /// Object store URL for resolving file paths (e.g., s3://bucket/ or file:///) object_store_url: Arc, /// Catalog base path component for resolving relative schema paths (e.g., /prefix/) @@ -26,22 +29,38 @@ pub struct DuckLakeCatalog { impl DuckLakeCatalog { /// Create a new DuckLake catalog with a metadata provider + /// + /// Gets the current snapshot ID at creation time and binds the catalog to it. + /// For backward compatibility. For explicit snapshot control, use `with_snapshot()`. pub fn new(provider: impl MetadataProvider + 'static) -> Result { let provider = Arc::new(provider) as Arc; + let snapshot_id = provider.get_current_snapshot()?; let data_path = provider.get_data_path()?; let (object_store_url, catalog_path) = parse_object_store_url(&data_path)?; Ok(Self { provider, + snapshot_id, object_store_url: Arc::new(object_store_url), catalog_path, }) } - fn get_current_snapshot_id(&self) -> Result { - self.provider - .get_current_snapshot() - .inspect_err(|e| tracing::error!(error = %e, "Failed to get current snapshot")) + /// Create a catalog bound to a specific snapshot ID + /// + /// All schemas and tables returned will use this snapshot, guaranteeing + /// query consistency even if multiple catalog/schema/table lookups occur + /// during query planning. + pub fn with_snapshot(provider: Arc, snapshot_id: i64) -> Result { + let data_path = provider.get_data_path()?; + let (object_store_url, catalog_path) = parse_object_store_url(&data_path)?; + + Ok(Self { + provider, + snapshot_id, + object_store_url: Arc::new(object_store_url), + catalog_path, + }) } } @@ -51,14 +70,9 @@ impl CatalogProvider for DuckLakeCatalog { } fn schema_names(&self) -> Vec { - let snapshot_id = match self.get_current_snapshot_id() { - Ok(id) => id, - Err(_) => return Vec::new(), - }; - - // Query database with snapshot_id + // Use the catalog's pinned snapshot_id self.provider - .list_schemas(snapshot_id) + .list_schemas(self.snapshot_id) .unwrap_or_default() .into_iter() .map(|s| s.schema_name) @@ -66,13 +80,8 @@ impl CatalogProvider for DuckLakeCatalog { } fn schema(&self, name: &str) -> Option> { - let snapshot_id = match self.get_current_snapshot_id() { - Ok(id) => id, - Err(_) => return None, - }; - - // Query database with snapshot_id - match self.provider.get_schema_by_name(name, snapshot_id) { + // Use the catalog's pinned snapshot_id + match self.provider.get_schema_by_name(name, self.snapshot_id) { Ok(Some(meta)) => { // Resolve schema path hierarchically let schema_path = if meta.path_is_relative { @@ -83,12 +92,12 @@ impl CatalogProvider for DuckLakeCatalog { meta.path }; - // Pass snapshot_id to schema + // Pass the pinned snapshot_id to schema Some(Arc::new(DuckLakeSchema::new( meta.schema_id, meta.schema_name, Arc::clone(&self.provider), - snapshot_id, // Propagate snapshot_id + self.snapshot_id, // Propagate pinned snapshot_id self.object_store_url.clone(), schema_path, )) as Arc)