Skip to content

Commit 86eef49

Browse files
2 parents 73baae2 + 304ed9b commit 86eef49

2 files changed

Lines changed: 62 additions & 31 deletions

File tree

examples/basic_query.rs

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,30 @@
1-
//! Basic DuckLake query example
1+
//! Basic DuckLake query example with snapshot isolation
22
//!
33
//! This example demonstrates how to:
44
//! 1. Create a DuckLake catalog from a DuckDB catalog file
5-
//! 2. Register it with DataFusion
6-
//! 3. Execute a simple SELECT query
5+
//! 2. Bind the catalog to a specific snapshot for query consistency
6+
//! 3. Register it with DataFusion
7+
//! 4. Execute a simple SELECT query
8+
//!
9+
//! ## Snapshot Isolation
10+
//!
11+
//! Each DuckLake catalog is bound to a specific snapshot ID at creation time.
12+
//! This guarantees that all queries through that catalog see a consistent view
13+
//! of the data, even if multiple schema/table lookups happen during query planning
14+
//! or if the underlying data changes.
15+
//!
16+
//! To query data at different points in time, create separate catalogs bound to
17+
//! different snapshot IDs.
718
//!
819
//! To run this example, you need:
920
//! - A DuckDB database file with DuckLake tables
1021
//! - Parquet data files referenced by the catalog
1122
//!
12-
//! Usage: cargo run --example basic_query
23+
//! Usage: cargo run --example basic_query <catalog.db> <sql>
1324
1425
use datafusion::execution::runtime_env::RuntimeEnv;
1526
use datafusion::prelude::*;
16-
use datafusion_ducklake::{DuckLakeCatalog, DuckdbMetadataProvider, register_ducklake_functions};
27+
use datafusion_ducklake::{DuckLakeCatalog, DuckdbMetadataProvider, MetadataProvider, register_ducklake_functions};
1728
use object_store::ObjectStore;
1829
use object_store::aws::AmazonS3Builder;
1930
use std::env;
@@ -37,7 +48,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
3748
println!("Connecting to DuckLake catalog: {}", catalog_path);
3849

3950
// Create the metadata provider
40-
let provider = DuckdbMetadataProvider::new(catalog_path)?;
51+
let provider = Arc::new(DuckdbMetadataProvider::new(catalog_path)?);
52+
53+
// Get the current snapshot ID
54+
// This ensures query consistency - all metadata lookups will use this snapshot
55+
let snapshot_id = provider.get_current_snapshot()?;
56+
println!("Current snapshot ID: {}", snapshot_id);
4157

4258
// Create runtime and register object stores
4359
// For MinIO or S3, register the object store with the runtime
@@ -56,8 +72,14 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
5672
);
5773
runtime.register_object_store(&Url::parse("s3://ducklake-data/")?, s3);
5874

59-
// Create the DuckLake catalog
60-
let ducklake_catalog = DuckLakeCatalog::new(provider)?;
75+
// Create the DuckLake catalog bound to the snapshot
76+
// This ensures all queries through this catalog see consistent data
77+
// from this specific snapshot, even if the underlying data changes
78+
let ducklake_catalog = DuckLakeCatalog::with_snapshot(provider, snapshot_id)?;
79+
80+
// Alternative: Use the backward-compatible constructor that automatically
81+
// binds to the current snapshot:
82+
// let ducklake_catalog = DuckLakeCatalog::new(DuckdbMetadataProvider::new(catalog_path)?)?;
6183

6284
println!("✓ Connected to DuckLake catalog");
6385

src/catalog.rs

Lines changed: 32 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,13 @@ use datafusion::datasource::object_store::ObjectStoreUrl;
1515
///
1616
/// Connects to a DuckLake catalog database and provides access to schemas and tables.
1717
/// Uses dynamic metadata lookup - schemas are queried on-demand from the catalog database.
18+
/// Bound to a specific snapshot ID for query consistency.
1819
#[derive(Debug)]
1920
pub struct DuckLakeCatalog {
2021
/// Metadata provider for querying catalog
2122
provider: Arc<dyn MetadataProvider>,
23+
/// Snapshot ID this catalog is bound to (for query consistency)
24+
snapshot_id: i64,
2225
/// Object store URL for resolving file paths (e.g., s3://bucket/ or file:///)
2326
object_store_url: Arc<ObjectStoreUrl>,
2427
/// Catalog base path component for resolving relative schema paths (e.g., /prefix/)
@@ -27,13 +30,35 @@ pub struct DuckLakeCatalog {
2730

2831
impl DuckLakeCatalog {
2932
/// Create a new DuckLake catalog with a metadata provider
33+
///
34+
/// Gets the current snapshot ID at creation time and binds the catalog to it.
35+
/// For backward compatibility. For explicit snapshot control, use `with_snapshot()`.
3036
pub fn new(provider: impl MetadataProvider + 'static) -> Result<Self> {
3137
let provider = Arc::new(provider) as Arc<dyn MetadataProvider>;
38+
let snapshot_id = provider.get_current_snapshot()?;
39+
let data_path = provider.get_data_path()?;
40+
let (object_store_url, catalog_path) = parse_object_store_url(&data_path)?;
41+
42+
Ok(Self {
43+
provider,
44+
snapshot_id,
45+
object_store_url: Arc::new(object_store_url),
46+
catalog_path,
47+
})
48+
}
49+
50+
/// Create a catalog bound to a specific snapshot ID
51+
///
52+
/// All schemas and tables returned will use this snapshot, guaranteeing
53+
/// query consistency even if multiple catalog/schema/table lookups occur
54+
/// during query planning.
55+
pub fn with_snapshot(provider: Arc<dyn MetadataProvider>, snapshot_id: i64) -> Result<Self> {
3256
let data_path = provider.get_data_path()?;
3357
let (object_store_url, catalog_path) = parse_object_store_url(&data_path)?;
3458

3559
Ok(Self {
3660
provider,
61+
snapshot_id,
3762
object_store_url: Arc::new(object_store_url),
3863
catalog_path,
3964
})
@@ -45,12 +70,6 @@ impl DuckLakeCatalog {
4570
pub fn provider(&self) -> Arc<dyn MetadataProvider> {
4671
self.provider.clone()
4772
}
48-
49-
fn get_current_snapshot_id(&self) -> Result<i64> {
50-
self.provider
51-
.get_current_snapshot()
52-
.inspect_err(|e| tracing::error!(error = %e, "Failed to get current snapshot"))
53-
}
5473
}
5574

5675
impl CatalogProvider for DuckLakeCatalog {
@@ -59,22 +78,17 @@ impl CatalogProvider for DuckLakeCatalog {
5978
}
6079

6180
fn schema_names(&self) -> Vec<String> {
62-
let snapshot_id = match self.get_current_snapshot_id() {
63-
Ok(id) => id,
64-
Err(_) => return vec!["information_schema".to_string()],
65-
};
66-
6781
// Start with information_schema
6882
let mut names = vec!["information_schema".to_string()];
6983

70-
// Add data schemas from catalog
84+
// Add data schemas from catalog using the pinned snapshot_id
7185
let data_schemas = self
7286
.provider
73-
.list_schemas(snapshot_id)
87+
.list_schemas(self.snapshot_id)
7488
.inspect_err(|e| {
7589
tracing::error!(
7690
error = %e,
77-
snapshot_id = %snapshot_id,
91+
snapshot_id = %self.snapshot_id,
7892
"Failed to list schemas from catalog"
7993
)
8094
})
@@ -99,13 +113,8 @@ impl CatalogProvider for DuckLakeCatalog {
99113
))));
100114
}
101115

102-
let snapshot_id = match self.get_current_snapshot_id() {
103-
Ok(id) => id,
104-
Err(_) => return None,
105-
};
106-
107-
// Query database with snapshot_id for data schemas
108-
match self.provider.get_schema_by_name(name, snapshot_id) {
116+
// Query database with the pinned snapshot_id for data schemas
117+
match self.provider.get_schema_by_name(name, self.snapshot_id) {
109118
Ok(Some(meta)) => {
110119
// Resolve schema path hierarchically
111120
let schema_path = if meta.path_is_relative {
@@ -116,12 +125,12 @@ impl CatalogProvider for DuckLakeCatalog {
116125
meta.path
117126
};
118127

119-
// Pass snapshot_id to schema
128+
// Pass the pinned snapshot_id to schema
120129
Some(Arc::new(DuckLakeSchema::new(
121130
meta.schema_id,
122131
meta.schema_name,
123132
Arc::clone(&self.provider),
124-
snapshot_id, // Propagate snapshot_id
133+
self.snapshot_id, // Propagate pinned snapshot_id
125134
self.object_store_url.clone(),
126135
schema_path,
127136
)) as Arc<dyn SchemaProvider>)

0 commit comments

Comments
 (0)