Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 30 additions & 8 deletions examples/basic_query.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,30 @@
//! Basic DuckLake query example
//! Basic DuckLake query example with snapshot isolation
//!
//! This example demonstrates how to:
//! 1. Create a DuckLake catalog from a DuckDB catalog file
//! 2. Register it with DataFusion
//! 3. Execute a simple SELECT query
//! 2. Bind the catalog to a specific snapshot for query consistency
//! 3. Register it with DataFusion
//! 4. Execute a simple SELECT query
//!
//! ## Snapshot Isolation
//!
//! Each DuckLake catalog is bound to a specific snapshot ID at creation time.
//! This guarantees that all queries through that catalog see a consistent view
//! of the data, even if multiple schema/table lookups happen during query planning
//! or if the underlying data changes.
//!
//! To query data at different points in time, create separate catalogs bound to
//! different snapshot IDs.
//!
//! To run this example, you need:
//! - A DuckDB database file with DuckLake tables
//! - Parquet data files referenced by the catalog
//!
//! Usage: cargo run --example basic_query
//! Usage: cargo run --example basic_query <catalog.db> <sql>

use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::prelude::*;
use datafusion_ducklake::{DuckLakeCatalog, DuckdbMetadataProvider};
use datafusion_ducklake::{DuckLakeCatalog, DuckdbMetadataProvider, MetadataProvider};
use object_store::ObjectStore;
use object_store::aws::AmazonS3Builder;
use std::env;
Expand All @@ -37,7 +48,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("Connecting to DuckLake catalog: {}", catalog_path);

// Create the metadata provider
let provider = DuckdbMetadataProvider::new(catalog_path)?;
let provider = Arc::new(DuckdbMetadataProvider::new(catalog_path)?);

// Get the current snapshot ID
// This ensures query consistency - all metadata lookups will use this snapshot
let snapshot_id = provider.get_current_snapshot()?;
println!("Current snapshot ID: {}", snapshot_id);

// Create runtime and register object stores
// For MinIO or S3, register the object store with the runtime
Expand All @@ -56,8 +72,14 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
);
runtime.register_object_store(&Url::parse("s3://ducklake-data/")?, s3);

// Create the DuckLake catalog
let ducklake_catalog = DuckLakeCatalog::new(provider)?;
// Create the DuckLake catalog bound to the snapshot
// This ensures all queries through this catalog see consistent data
// from this specific snapshot, even if the underlying data changes
let ducklake_catalog = DuckLakeCatalog::with_snapshot(provider, snapshot_id)?;

// Alternative: Use the backward-compatible constructor that automatically
// binds to the current snapshot:
// let ducklake_catalog = DuckLakeCatalog::new(DuckdbMetadataProvider::new(catalog_path)?)?;

println!("✓ Connected to DuckLake catalog");

Expand Down
49 changes: 29 additions & 20 deletions src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@ use datafusion::datasource::object_store::ObjectStoreUrl;
///
/// Connects to a DuckLake catalog database and provides access to schemas and tables.
/// Uses dynamic metadata lookup - schemas are queried on-demand from the catalog database.
/// Bound to a specific snapshot ID for query consistency.
#[derive(Debug)]
pub struct DuckLakeCatalog {
/// Metadata provider for querying catalog
provider: Arc<dyn MetadataProvider>,
/// Snapshot ID this catalog is bound to (for query consistency)
snapshot_id: i64,
/// Object store URL for resolving file paths (e.g., s3://bucket/ or file:///)
object_store_url: Arc<ObjectStoreUrl>,
/// Catalog base path component for resolving relative schema paths (e.g., /prefix/)
Expand All @@ -26,22 +29,38 @@ pub struct DuckLakeCatalog {

impl DuckLakeCatalog {
/// Create a new DuckLake catalog with a metadata provider
///
/// Gets the current snapshot ID at creation time and binds the catalog to it.
/// For backward compatibility. For explicit snapshot control, use `with_snapshot()`.
pub fn new(provider: impl MetadataProvider + 'static) -> Result<Self> {
let provider = Arc::new(provider) as Arc<dyn MetadataProvider>;
let snapshot_id = provider.get_current_snapshot()?;
let data_path = provider.get_data_path()?;
let (object_store_url, catalog_path) = parse_object_store_url(&data_path)?;

Ok(Self {
provider,
snapshot_id,
object_store_url: Arc::new(object_store_url),
catalog_path,
})
}

fn get_current_snapshot_id(&self) -> Result<i64> {
self.provider
.get_current_snapshot()
.inspect_err(|e| tracing::error!(error = %e, "Failed to get current snapshot"))
/// Create a catalog bound to a specific snapshot ID
///
/// All schemas and tables returned will use this snapshot, guaranteeing
/// query consistency even if multiple catalog/schema/table lookups occur
/// during query planning.
pub fn with_snapshot(provider: Arc<dyn MetadataProvider>, snapshot_id: i64) -> Result<Self> {
let data_path = provider.get_data_path()?;
let (object_store_url, catalog_path) = parse_object_store_url(&data_path)?;

Ok(Self {
provider,
snapshot_id,
object_store_url: Arc::new(object_store_url),
catalog_path,
})
}
}

Expand All @@ -51,28 +70,18 @@ impl CatalogProvider for DuckLakeCatalog {
}

fn schema_names(&self) -> Vec<String> {
let snapshot_id = match self.get_current_snapshot_id() {
Ok(id) => id,
Err(_) => return Vec::new(),
};

// Query database with snapshot_id
// Use the catalog's pinned snapshot_id
self.provider
.list_schemas(snapshot_id)
.list_schemas(self.snapshot_id)
.unwrap_or_default()
.into_iter()
.map(|s| s.schema_name)
.collect()
}

fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
let snapshot_id = match self.get_current_snapshot_id() {
Ok(id) => id,
Err(_) => return None,
};

// Query database with snapshot_id
match self.provider.get_schema_by_name(name, snapshot_id) {
// Use the catalog's pinned snapshot_id
match self.provider.get_schema_by_name(name, self.snapshot_id) {
Ok(Some(meta)) => {
// Resolve schema path hierarchically
let schema_path = if meta.path_is_relative {
Expand All @@ -83,12 +92,12 @@ impl CatalogProvider for DuckLakeCatalog {
meta.path
};

// Pass snapshot_id to schema
// Pass the pinned snapshot_id to schema
Some(Arc::new(DuckLakeSchema::new(
meta.schema_id,
meta.schema_name,
Arc::clone(&self.provider),
snapshot_id, // Propagate snapshot_id
self.snapshot_id, // Propagate pinned snapshot_id
self.object_store_url.clone(),
schema_path,
)) as Arc<dyn SchemaProvider>)
Expand Down
Loading