Skip to content
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ A DataFusion extension for querying [DuckLake](https://ducklake.select). DuckLak
- Parquet footer size hints for optimized I/O
- Filter pushdown to Parquet for row group pruning and page-level filtering
- Dynamic metadata lookup (no upfront catalog caching)
- SQL-queryable `information_schema` for catalog metadata (snapshots, schemas, tables, columns, files)
- DuckDB-style table functions: `ducklake_snapshots()`, `ducklake_table_info()`, `ducklake_list_files()`

## Known Limitations

Expand Down
15 changes: 13 additions & 2 deletions examples/basic_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@

use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::prelude::*;
use datafusion_ducklake::{DuckLakeCatalog, DuckdbMetadataProvider, MetadataProvider};
use datafusion_ducklake::{
DuckLakeCatalog, DuckdbMetadataProvider, MetadataProvider, register_ducklake_functions,
};
use object_store::ObjectStore;
use object_store::aws::AmazonS3Builder;
use std::env;
Expand Down Expand Up @@ -88,10 +90,19 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create DataFusion session context
let ctx = SessionContext::new_with_config_rt(config, runtime.clone());

// Register the DuckLake catalog
// Get the provider before moving the catalog
let provider = ducklake_catalog.provider();

// Register the DuckLake catalog (standard DataFusion pattern)
ctx.register_catalog("ducklake", Arc::new(ducklake_catalog));

// Register table functions (ducklake_snapshots, ducklake_table_info, ducklake_list_files)
register_ducklake_functions(&ctx, provider);

println!("✓ Registered DuckLake catalog with DataFusion");
println!(
"✓ Registered table functions (ducklake_snapshots, ducklake_table_info, ducklake_list_files)"
);

// List available schemas
let catalogs = ctx.catalog_names();
Expand Down
43 changes: 38 additions & 5 deletions src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::any::Any;
use std::sync::Arc;

use crate::Result;
use crate::information_schema::InformationSchemaProvider;
use crate::metadata_provider::MetadataProvider;
use crate::path_resolver::parse_object_store_url;
use crate::schema::DuckLakeSchema;
Expand Down Expand Up @@ -62,6 +63,13 @@ impl DuckLakeCatalog {
catalog_path,
})
}

/// Get the metadata provider for this catalog
///
/// This is useful when you need to register table functions separately.
pub fn provider(&self) -> Arc<dyn MetadataProvider> {
self.provider.clone()
}
}

impl CatalogProvider for DuckLakeCatalog {
Expand All @@ -70,17 +78,42 @@ impl CatalogProvider for DuckLakeCatalog {
}

fn schema_names(&self) -> Vec<String> {
// Use the catalog's pinned snapshot_id
self.provider
// Start with information_schema
let mut names = vec!["information_schema".to_string()];

// Add data schemas from catalog using the pinned snapshot_id
let data_schemas = self
.provider
.list_schemas(self.snapshot_id)
.inspect_err(|e| {
tracing::error!(
error = %e,
snapshot_id = %self.snapshot_id,
"Failed to list schemas from catalog"
)
})
.unwrap_or_default()
.into_iter()
.map(|s| s.schema_name)
.collect()
.map(|s| s.schema_name);

names.extend(data_schemas);

// Ensure deterministic order and no duplicates
names.sort();
names.dedup();

names
Comment thread
shefeek-jinnah marked this conversation as resolved.
}

fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
// Use the catalog's pinned snapshot_id
// Handle information_schema specially
if name == "information_schema" {
return Some(Arc::new(InformationSchemaProvider::new(Arc::clone(
&self.provider,
))));
}

// Query database with the pinned snapshot_id for data schemas
match self.provider.get_schema_by_name(name, self.snapshot_id) {
Ok(Some(meta)) => {
// Resolve schema path hierarchically
Expand Down
Loading
Loading