-
Notifications
You must be signed in to change notification settings - Fork 11
Expand file tree
/
Copy pathcatalog.rs
More file actions
141 lines (124 loc) · 4.94 KB
/
catalog.rs
File metadata and controls
141 lines (124 loc) · 4.94 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
//! DuckLake catalog provider implementation
use std::any::Any;
use std::sync::Arc;
use crate::Result;
use crate::information_schema::InformationSchemaProvider;
use crate::metadata_provider::MetadataProvider;
use crate::path_resolver::parse_object_store_url;
use crate::schema::DuckLakeSchema;
use datafusion::catalog::{CatalogProvider, SchemaProvider};
use datafusion::datasource::object_store::ObjectStoreUrl;
/// DuckLake catalog provider
///
/// Connects to a DuckLake catalog database and provides access to schemas and tables.
/// Uses dynamic metadata lookup - schemas are queried on-demand from the catalog database.
/// Bound to a specific snapshot ID for query consistency.
#[derive(Debug)]
pub struct DuckLakeCatalog {
/// Metadata provider for querying catalog
provider: Arc<dyn MetadataProvider>,
/// Snapshot ID this catalog is bound to (for query consistency)
snapshot_id: i64,
/// Object store URL for resolving file paths (e.g., s3://bucket/ or file:///)
object_store_url: Arc<ObjectStoreUrl>,
/// Catalog base path component for resolving relative schema paths (e.g., /prefix/)
catalog_path: String,
}
impl DuckLakeCatalog {
/// Create a new DuckLake catalog with a metadata provider
///
/// Gets the current snapshot ID at creation time and binds the catalog to it.
/// For backward compatibility. For explicit snapshot control, use `with_snapshot()`.
pub fn new(provider: impl MetadataProvider + 'static) -> Result<Self> {
let provider = Arc::new(provider) as Arc<dyn MetadataProvider>;
let snapshot_id = provider.get_current_snapshot()?;
let data_path = provider.get_data_path()?;
let (object_store_url, catalog_path) = parse_object_store_url(&data_path)?;
Ok(Self {
provider,
snapshot_id,
object_store_url: Arc::new(object_store_url),
catalog_path,
})
}
/// Create a catalog bound to a specific snapshot ID
///
/// All schemas and tables returned will use this snapshot, guaranteeing
/// query consistency even if multiple catalog/schema/table lookups occur
/// during query planning.
pub fn with_snapshot(provider: Arc<dyn MetadataProvider>, snapshot_id: i64) -> Result<Self> {
let data_path = provider.get_data_path()?;
let (object_store_url, catalog_path) = parse_object_store_url(&data_path)?;
Ok(Self {
provider,
snapshot_id,
object_store_url: Arc::new(object_store_url),
catalog_path,
})
}
/// Get the metadata provider for this catalog
///
/// This is useful when you need to register table functions separately.
pub fn provider(&self) -> Arc<dyn MetadataProvider> {
self.provider.clone()
}
}
impl CatalogProvider for DuckLakeCatalog {
fn as_any(&self) -> &dyn Any {
self
}
fn schema_names(&self) -> Vec<String> {
// Start with information_schema
let mut names = vec!["information_schema".to_string()];
// Add data schemas from catalog using the pinned snapshot_id
let data_schemas = self
.provider
.list_schemas(self.snapshot_id)
.inspect_err(|e| {
tracing::error!(
error = %e,
snapshot_id = %self.snapshot_id,
"Failed to list schemas from catalog"
)
})
.unwrap_or_default()
.into_iter()
.map(|s| s.schema_name);
names.extend(data_schemas);
// Ensure deterministic order and no duplicates
names.sort();
names.dedup();
names
}
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
// Handle information_schema specially
if name == "information_schema" {
return Some(Arc::new(InformationSchemaProvider::new(Arc::clone(
&self.provider,
))));
}
// Query database with the pinned snapshot_id for data schemas
match self.provider.get_schema_by_name(name, self.snapshot_id) {
Ok(Some(meta)) => {
// Resolve schema path hierarchically
let schema_path = if meta.path_is_relative {
// Schema path is relative to catalog path
format!("{}{}", self.catalog_path, meta.path)
} else {
// Schema path is absolute
meta.path
};
// Pass the pinned snapshot_id to schema
Some(Arc::new(DuckLakeSchema::new(
meta.schema_id,
meta.schema_name,
Arc::clone(&self.provider),
self.snapshot_id, // Propagate pinned snapshot_id
self.object_store_url.clone(),
schema_path,
)) as Arc<dyn SchemaProvider>)
},
_ => None,
}
}
}