Skip to content

Commit ed394d9

Browse files
Implements snapshot-pinned catalog to guarantee query consistency
1 parent 1cd5657 commit ed394d9

2 files changed

Lines changed: 60 additions & 28 deletions

File tree

examples/basic_query.rs

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,30 @@
1-
//! Basic DuckLake query example
1+
//! Basic DuckLake query example with snapshot isolation
22
//!
33
//! This example demonstrates how to:
44
//! 1. Create a DuckLake catalog from a DuckDB catalog file
5-
//! 2. Register it with DataFusion
6-
//! 3. Execute a simple SELECT query
5+
//! 2. Bind the catalog to a specific snapshot for query consistency
6+
//! 3. Register it with DataFusion
7+
//! 4. Execute a simple SELECT query
8+
//!
9+
//! ## Snapshot Isolation
10+
//!
11+
//! Each DuckLake catalog is bound to a specific snapshot ID at creation time.
12+
//! This guarantees that all queries through that catalog see a consistent view
13+
//! of the data, even if multiple schema/table lookups happen during query planning
14+
//! or if the underlying data changes.
15+
//!
16+
//! To query data at different points in time, create separate catalogs bound to
17+
//! different snapshot IDs.
718
//!
819
//! To run this example, you need:
920
//! - A DuckDB database file with DuckLake tables
1021
//! - Parquet data files referenced by the catalog
1122
//!
12-
//! Usage: cargo run --example basic_query
23+
//! Usage: cargo run --example basic_query <catalog.db> <sql>
1324
1425
use datafusion::execution::runtime_env::RuntimeEnv;
1526
use datafusion::prelude::*;
16-
use datafusion_ducklake::{DuckLakeCatalog, DuckdbMetadataProvider};
27+
use datafusion_ducklake::{DuckLakeCatalog, DuckdbMetadataProvider, MetadataProvider};
1728
use object_store::ObjectStore;
1829
use object_store::aws::AmazonS3Builder;
1930
use std::env;
@@ -37,7 +48,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
3748
println!("Connecting to DuckLake catalog: {}", catalog_path);
3849

3950
// Create the metadata provider
40-
let provider = DuckdbMetadataProvider::new(catalog_path)?;
51+
let provider = Arc::new(DuckdbMetadataProvider::new(catalog_path)?);
52+
53+
// Get the current snapshot ID
54+
// This ensures query consistency - all metadata lookups will use this snapshot
55+
let snapshot_id = provider.get_current_snapshot()?;
56+
println!("Current snapshot ID: {}", snapshot_id);
4157

4258
// Create runtime and register object stores
4359
// For MinIO or S3, register the object store with the runtime
@@ -56,8 +72,14 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
5672
);
5773
runtime.register_object_store(&Url::parse("s3://ducklake-data/")?, s3);
5874

59-
// Create the DuckLake catalog
60-
let ducklake_catalog = DuckLakeCatalog::new(provider)?;
75+
// Create the DuckLake catalog bound to the snapshot
76+
// This ensures all queries through this catalog see consistent data
77+
// from this specific snapshot, even if the underlying data changes
78+
let ducklake_catalog = DuckLakeCatalog::with_snapshot(provider, snapshot_id)?;
79+
80+
// Alternative: Use the backward-compatible constructor that automatically
81+
// binds to the current snapshot:
82+
// let ducklake_catalog = DuckLakeCatalog::new(DuckdbMetadataProvider::new(catalog_path)?)?;
6183

6284
println!("✓ Connected to DuckLake catalog");
6385

src/catalog.rs

Lines changed: 30 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,13 @@ use datafusion::datasource::object_store::ObjectStoreUrl;
1414
///
1515
/// Connects to a DuckLake catalog database and provides access to schemas and tables.
1616
/// Uses dynamic metadata lookup - schemas are queried on-demand from the catalog database.
17+
/// Bound to a specific snapshot ID for query consistency.
1718
#[derive(Debug)]
1819
pub struct DuckLakeCatalog {
1920
/// Metadata provider for querying catalog
2021
provider: Arc<dyn MetadataProvider>,
22+
/// Snapshot ID this catalog is bound to (for query consistency)
23+
snapshot_id: i64,
2124
/// Object store URL for resolving file paths (e.g., s3://bucket/ or file:///)
2225
object_store_url: Arc<ObjectStoreUrl>,
2326
/// Catalog base path component for resolving relative schema paths (e.g., /prefix/)
@@ -26,22 +29,39 @@ pub struct DuckLakeCatalog {
2629

2730
impl DuckLakeCatalog {
2831
/// Create a new DuckLake catalog with a metadata provider
32+
///
33+
/// Gets the current snapshot ID at creation time and binds the catalog to it.
34+
/// For backward compatibility. For explicit snapshot control, use `with_snapshot()`.
2935
pub fn new(provider: impl MetadataProvider + 'static) -> Result<Self> {
3036
let provider = Arc::new(provider) as Arc<dyn MetadataProvider>;
37+
let snapshot_id = provider.get_current_snapshot()?;
3138
let data_path = provider.get_data_path()?;
3239
let (object_store_url, catalog_path) = parse_object_store_url(&data_path)?;
3340

3441
Ok(Self {
3542
provider,
43+
snapshot_id,
3644
object_store_url: Arc::new(object_store_url),
3745
catalog_path,
3846
})
3947
}
4048

41-
fn get_current_snapshot_id(&self) -> Result<i64> {
42-
self.provider
43-
.get_current_snapshot()
44-
.inspect_err(|e| tracing::error!(error = %e, "Failed to get current snapshot"))
49+
/// Create a catalog bound to a specific snapshot ID
50+
///
51+
/// All schemas and tables returned will use this snapshot, guaranteeing
52+
/// query consistency even if multiple catalog/schema/table lookups occur
53+
/// during query planning.
54+
55+
pub fn with_snapshot(provider: Arc<dyn MetadataProvider>, snapshot_id: i64) -> Result<Self> {
56+
let data_path = provider.get_data_path()?;
57+
let (object_store_url, catalog_path) = parse_object_store_url(&data_path)?;
58+
59+
Ok(Self {
60+
provider,
61+
snapshot_id,
62+
object_store_url: Arc::new(object_store_url),
63+
catalog_path,
64+
})
4565
}
4666
}
4767

@@ -51,28 +71,18 @@ impl CatalogProvider for DuckLakeCatalog {
5171
}
5272

5373
fn schema_names(&self) -> Vec<String> {
54-
let snapshot_id = match self.get_current_snapshot_id() {
55-
Ok(id) => id,
56-
Err(_) => return Vec::new(),
57-
};
58-
59-
// Query database with snapshot_id
74+
// Use the catalog's pinned snapshot_id
6075
self.provider
61-
.list_schemas(snapshot_id)
76+
.list_schemas(self.snapshot_id)
6277
.unwrap_or_default()
6378
.into_iter()
6479
.map(|s| s.schema_name)
6580
.collect()
6681
}
6782

6883
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
69-
let snapshot_id = match self.get_current_snapshot_id() {
70-
Ok(id) => id,
71-
Err(_) => return None,
72-
};
73-
74-
// Query database with snapshot_id
75-
match self.provider.get_schema_by_name(name, snapshot_id) {
84+
// Use the catalog's pinned snapshot_id
85+
match self.provider.get_schema_by_name(name, self.snapshot_id) {
7686
Ok(Some(meta)) => {
7787
// Resolve schema path hierarchically
7888
let schema_path = if meta.path_is_relative {
@@ -83,12 +93,12 @@ impl CatalogProvider for DuckLakeCatalog {
8393
meta.path
8494
};
8595

86-
// Pass snapshot_id to schema
96+
// Pass the pinned snapshot_id to schema
8797
Some(Arc::new(DuckLakeSchema::new(
8898
meta.schema_id,
8999
meta.schema_name,
90100
Arc::clone(&self.provider),
91-
snapshot_id, // Propagate snapshot_id
101+
self.snapshot_id, // Propagate pinned snapshot_id
92102
self.object_store_url.clone(),
93103
schema_path,
94104
)) as Arc<dyn SchemaProvider>)

0 commit comments

Comments
 (0)