Skip to content

Commit bc33657

Browse files
authored
feat!(storage): Integration Storage trait with FileIO (apache#2116)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes apache#123` indicates that this PR will close issue apache#123. --> - This depends on apache#2109 - Closes apache#2058 ## What changes are included in this PR? - Update `FileIO` to hold `dyn Storage` instead of `OpenDalStorage` - Update `FileIOBuilder` accordingly - Removed `Extensions` from `FileIO` - Add `with_storage_factory` in `CatalogBuilder` and update all implementations accordingly - Add an optional `dyn StorageFactory` to `IcebergTableProviderFactory` - Update other FileIOBuilder usages accordingly <!-- Provide a summary of the modifications in this PR. List the main changes such as new features, bug fixes, refactoring, or any other updates. --> ## Are these changes tested? Mostly rely on the existing tests, added some uts for FileIOBuilder <!-- Specify what test covers (unit test, integration test, etc.). If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? -->
1 parent e844d82 commit bc33657

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+859
-680
lines changed

bindings/python/src/datafusion_table_provider.rs

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use std::sync::Arc;
2222
use datafusion_ffi::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
2323
use datafusion_ffi::table_provider::FFI_TableProvider;
2424
use iceberg::TableIdent;
25-
use iceberg::io::FileIO;
25+
use iceberg::io::{FileIOBuilder, OpenDalStorageFactory, StorageFactory};
2626
use iceberg::table::StaticTable;
2727
use iceberg_datafusion::table::IcebergStaticTableProvider;
2828
use pyo3::exceptions::{PyRuntimeError, PyValueError};
@@ -31,6 +31,30 @@ use pyo3::types::{PyAny, PyCapsule};
3131

3232
use crate::runtime::runtime;
3333

34+
/// Parse the scheme from a URL and return the appropriate StorageFactory.
35+
fn storage_factory_from_path(path: &str) -> PyResult<Arc<dyn StorageFactory>> {
36+
let scheme = path
37+
.split("://")
38+
.next()
39+
.ok_or_else(|| PyRuntimeError::new_err(format!("Invalid path, missing scheme: {path}")))?;
40+
41+
let factory: Arc<dyn StorageFactory> = match scheme {
42+
"file" | "" => Arc::new(OpenDalStorageFactory::Fs),
43+
"s3" | "s3a" => Arc::new(OpenDalStorageFactory::S3 {
44+
configured_scheme: scheme.to_string(),
45+
customized_credential_load: None,
46+
}),
47+
"memory" => Arc::new(OpenDalStorageFactory::Memory),
48+
_ => {
49+
return Err(PyRuntimeError::new_err(format!(
50+
"Unsupported storage scheme: {scheme}"
51+
)));
52+
}
53+
};
54+
55+
Ok(factory)
56+
}
57+
3458
pub(crate) fn validate_pycapsule(capsule: &Bound<PyCapsule>, name: &str) -> PyResult<()> {
3559
let capsule_name = capsule.name()?;
3660
if capsule_name.is_none() {
@@ -85,16 +109,15 @@ impl PyIcebergDataFusionTable {
85109
let table_ident = TableIdent::from_strs(identifier)
86110
.map_err(|e| PyRuntimeError::new_err(format!("Invalid table identifier: {e}")))?;
87111

88-
let mut builder = FileIO::from_path(&metadata_location)
89-
.map_err(|e| PyRuntimeError::new_err(format!("Failed to init FileIO: {e}")))?;
112+
let factory = storage_factory_from_path(&metadata_location)?;
113+
114+
let mut builder = FileIOBuilder::new(factory);
90115

91116
if let Some(props) = file_io_properties {
92117
builder = builder.with_props(props);
93118
}
94119

95-
let file_io = builder
96-
.build()
97-
.map_err(|e| PyRuntimeError::new_err(format!("Failed to build FileIO: {e}")))?;
120+
let file_io = builder.build();
98121

99122
let static_table =
100123
StaticTable::from_metadata_file(&metadata_location, table_ident, file_io)

crates/catalog/glue/src/catalog.rs

Lines changed: 43 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,16 @@
1717

1818
use std::collections::HashMap;
1919
use std::fmt::Debug;
20+
use std::sync::Arc;
2021

2122
use anyhow::anyhow;
2223
use async_trait::async_trait;
2324
use aws_sdk_glue::operation::create_table::CreateTableError;
2425
use aws_sdk_glue::operation::update_table::UpdateTableError;
2526
use aws_sdk_glue::types::TableInput;
2627
use iceberg::io::{
27-
FileIO, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY, S3_SESSION_TOKEN,
28+
FileIO, FileIOBuilder, OpenDalStorageFactory, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION,
29+
S3_SECRET_ACCESS_KEY, S3_SESSION_TOKEN, StorageFactory,
2830
};
2931
use iceberg::spec::{TableMetadata, TableMetadataBuilder};
3032
use iceberg::table::Table;
@@ -51,47 +53,58 @@ pub const GLUE_CATALOG_PROP_WAREHOUSE: &str = "warehouse";
5153

5254
/// Builder for [`GlueCatalog`].
5355
#[derive(Debug)]
54-
pub struct GlueCatalogBuilder(GlueCatalogConfig);
56+
pub struct GlueCatalogBuilder {
57+
config: GlueCatalogConfig,
58+
storage_factory: Option<Arc<dyn StorageFactory>>,
59+
}
5560

5661
impl Default for GlueCatalogBuilder {
5762
fn default() -> Self {
58-
Self(GlueCatalogConfig {
59-
name: None,
60-
uri: None,
61-
catalog_id: None,
62-
warehouse: "".to_string(),
63-
props: HashMap::new(),
64-
})
63+
Self {
64+
config: GlueCatalogConfig {
65+
name: None,
66+
uri: None,
67+
catalog_id: None,
68+
warehouse: "".to_string(),
69+
props: HashMap::new(),
70+
},
71+
storage_factory: None,
72+
}
6573
}
6674
}
6775

6876
impl CatalogBuilder for GlueCatalogBuilder {
6977
type C = GlueCatalog;
7078

79+
fn with_storage_factory(mut self, storage_factory: Arc<dyn StorageFactory>) -> Self {
80+
self.storage_factory = Some(storage_factory);
81+
self
82+
}
83+
7184
fn load(
7285
mut self,
7386
name: impl Into<String>,
7487
props: HashMap<String, String>,
7588
) -> impl Future<Output = Result<Self::C>> + Send {
76-
self.0.name = Some(name.into());
89+
self.config.name = Some(name.into());
7790

7891
if props.contains_key(GLUE_CATALOG_PROP_URI) {
79-
self.0.uri = props.get(GLUE_CATALOG_PROP_URI).cloned()
92+
self.config.uri = props.get(GLUE_CATALOG_PROP_URI).cloned()
8093
}
8194

8295
if props.contains_key(GLUE_CATALOG_PROP_CATALOG_ID) {
83-
self.0.catalog_id = props.get(GLUE_CATALOG_PROP_CATALOG_ID).cloned()
96+
self.config.catalog_id = props.get(GLUE_CATALOG_PROP_CATALOG_ID).cloned()
8497
}
8598

8699
if props.contains_key(GLUE_CATALOG_PROP_WAREHOUSE) {
87-
self.0.warehouse = props
100+
self.config.warehouse = props
88101
.get(GLUE_CATALOG_PROP_WAREHOUSE)
89102
.cloned()
90103
.unwrap_or_default();
91104
}
92105

93106
// Collect other remaining properties
94-
self.0.props = props
107+
self.config.props = props
95108
.into_iter()
96109
.filter(|(k, _)| {
97110
k != GLUE_CATALOG_PROP_URI
@@ -101,20 +114,20 @@ impl CatalogBuilder for GlueCatalogBuilder {
101114
.collect();
102115

103116
async move {
104-
if self.0.name.is_none() {
117+
if self.config.name.is_none() {
105118
return Err(Error::new(
106119
ErrorKind::DataInvalid,
107120
"Catalog name is required",
108121
));
109122
}
110-
if self.0.warehouse.is_empty() {
123+
if self.config.warehouse.is_empty() {
111124
return Err(Error::new(
112125
ErrorKind::DataInvalid,
113126
"Catalog warehouse is required",
114127
));
115128
}
116129

117-
GlueCatalog::new(self.0).await
130+
GlueCatalog::new(self.config, self.storage_factory).await
118131
}
119132
}
120133
}
@@ -148,7 +161,10 @@ impl Debug for GlueCatalog {
148161

149162
impl GlueCatalog {
150163
/// Create a new glue catalog
151-
async fn new(config: GlueCatalogConfig) -> Result<Self> {
164+
async fn new(
165+
config: GlueCatalogConfig,
166+
storage_factory: Option<Arc<dyn StorageFactory>>,
167+
) -> Result<Self> {
152168
let sdk_config = create_sdk_config(&config.props, config.uri.as_ref()).await;
153169
let mut file_io_props = config.props.clone();
154170
if !file_io_props.contains_key(S3_ACCESS_KEY_ID)
@@ -182,9 +198,16 @@ impl GlueCatalog {
182198

183199
let client = aws_sdk_glue::Client::new(&sdk_config);
184200

185-
let file_io = FileIO::from_path(&config.warehouse)?
201+
// Use provided factory or default to OpenDalStorageFactory::S3
202+
let factory = storage_factory.unwrap_or_else(|| {
203+
Arc::new(OpenDalStorageFactory::S3 {
204+
configured_scheme: "s3a".to_string(),
205+
customized_credential_load: None,
206+
})
207+
});
208+
let file_io = FileIOBuilder::new(factory)
186209
.with_props(file_io_props)
187-
.build()?;
210+
.build();
188211

189212
Ok(GlueCatalog {
190213
config,

crates/catalog/glue/tests/glue_catalog_test.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,12 @@
2121
//! Each test uses unique namespaces based on module path to avoid conflicts.
2222
2323
use std::collections::HashMap;
24+
use std::sync::Arc;
2425

25-
use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY};
26+
use iceberg::io::{
27+
FileIOBuilder, OpenDalStorageFactory, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION,
28+
S3_SECRET_ACCESS_KEY,
29+
};
2630
use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
2731
use iceberg::transaction::{ApplyTransactionAction, Transaction};
2832
use iceberg::{
@@ -59,11 +63,12 @@ async fn get_catalog() -> GlueCatalog {
5963
]);
6064

6165
// Wait for bucket to actually exist
62-
let file_io = iceberg::io::FileIO::from_path("s3a://")
63-
.unwrap()
64-
.with_props(props.clone())
65-
.build()
66-
.unwrap();
66+
let file_io = FileIOBuilder::new(Arc::new(OpenDalStorageFactory::S3 {
67+
configured_scheme: "s3a".to_string(),
68+
customized_credential_load: None,
69+
}))
70+
.with_props(props.clone())
71+
.build();
6772

6873
let mut retries = 0;
6974
while retries < 30 {

crates/catalog/hms/src/catalog.rs

Lines changed: 43 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,15 @@
1818
use std::collections::HashMap;
1919
use std::fmt::{Debug, Formatter};
2020
use std::net::ToSocketAddrs;
21+
use std::sync::Arc;
2122

2223
use anyhow::anyhow;
2324
use async_trait::async_trait;
2425
use hive_metastore::{
2526
ThriftHiveMetastoreClient, ThriftHiveMetastoreClientBuilder,
2627
ThriftHiveMetastoreGetDatabaseException, ThriftHiveMetastoreGetTableException,
2728
};
28-
use iceberg::io::FileIO;
29+
use iceberg::io::{FileIO, FileIOBuilder, StorageFactory};
2930
use iceberg::spec::{TableMetadata, TableMetadataBuilder};
3031
use iceberg::table::Table;
3132
use iceberg::{
@@ -50,52 +51,63 @@ pub const THRIFT_TRANSPORT_BUFFERED: &str = "buffered";
5051
/// HMS Catalog warehouse location
5152
pub const HMS_CATALOG_PROP_WAREHOUSE: &str = "warehouse";
5253

53-
/// Builder for [`RestCatalog`].
54+
/// Builder for [`HmsCatalog`].
5455
#[derive(Debug)]
55-
pub struct HmsCatalogBuilder(HmsCatalogConfig);
56+
pub struct HmsCatalogBuilder {
57+
config: HmsCatalogConfig,
58+
storage_factory: Option<Arc<dyn StorageFactory>>,
59+
}
5660

5761
impl Default for HmsCatalogBuilder {
5862
fn default() -> Self {
59-
Self(HmsCatalogConfig {
60-
name: None,
61-
address: "".to_string(),
62-
thrift_transport: HmsThriftTransport::default(),
63-
warehouse: "".to_string(),
64-
props: HashMap::new(),
65-
})
63+
Self {
64+
config: HmsCatalogConfig {
65+
name: None,
66+
address: "".to_string(),
67+
thrift_transport: HmsThriftTransport::default(),
68+
warehouse: "".to_string(),
69+
props: HashMap::new(),
70+
},
71+
storage_factory: None,
72+
}
6673
}
6774
}
6875

6976
impl CatalogBuilder for HmsCatalogBuilder {
7077
type C = HmsCatalog;
7178

79+
fn with_storage_factory(mut self, storage_factory: Arc<dyn StorageFactory>) -> Self {
80+
self.storage_factory = Some(storage_factory);
81+
self
82+
}
83+
7284
fn load(
7385
mut self,
7486
name: impl Into<String>,
7587
props: HashMap<String, String>,
7688
) -> impl Future<Output = Result<Self::C>> + Send {
77-
self.0.name = Some(name.into());
89+
self.config.name = Some(name.into());
7890

7991
if props.contains_key(HMS_CATALOG_PROP_URI) {
80-
self.0.address = props.get(HMS_CATALOG_PROP_URI).cloned().unwrap_or_default();
92+
self.config.address = props.get(HMS_CATALOG_PROP_URI).cloned().unwrap_or_default();
8193
}
8294

8395
if let Some(tt) = props.get(HMS_CATALOG_PROP_THRIFT_TRANSPORT) {
84-
self.0.thrift_transport = match tt.to_lowercase().as_str() {
96+
self.config.thrift_transport = match tt.to_lowercase().as_str() {
8597
THRIFT_TRANSPORT_FRAMED => HmsThriftTransport::Framed,
8698
THRIFT_TRANSPORT_BUFFERED => HmsThriftTransport::Buffered,
8799
_ => HmsThriftTransport::default(),
88100
};
89101
}
90102

91103
if props.contains_key(HMS_CATALOG_PROP_WAREHOUSE) {
92-
self.0.warehouse = props
104+
self.config.warehouse = props
93105
.get(HMS_CATALOG_PROP_WAREHOUSE)
94106
.cloned()
95107
.unwrap_or_default();
96108
}
97109

98-
self.0.props = props
110+
self.config.props = props
99111
.into_iter()
100112
.filter(|(k, _)| {
101113
k != HMS_CATALOG_PROP_URI
@@ -105,23 +117,23 @@ impl CatalogBuilder for HmsCatalogBuilder {
105117
.collect();
106118

107119
let result = {
108-
if self.0.name.is_none() {
120+
if self.config.name.is_none() {
109121
Err(Error::new(
110122
ErrorKind::DataInvalid,
111123
"Catalog name is required",
112124
))
113-
} else if self.0.address.is_empty() {
125+
} else if self.config.address.is_empty() {
114126
Err(Error::new(
115127
ErrorKind::DataInvalid,
116128
"Catalog address is required",
117129
))
118-
} else if self.0.warehouse.is_empty() {
130+
} else if self.config.warehouse.is_empty() {
119131
Err(Error::new(
120132
ErrorKind::DataInvalid,
121133
"Catalog warehouse is required",
122134
))
123135
} else {
124-
HmsCatalog::new(self.0)
136+
HmsCatalog::new(self.config, self.storage_factory)
125137
}
126138
};
127139

@@ -169,7 +181,10 @@ impl Debug for HmsCatalog {
169181

170182
impl HmsCatalog {
171183
/// Create a new hms catalog.
172-
fn new(config: HmsCatalogConfig) -> Result<Self> {
184+
fn new(
185+
config: HmsCatalogConfig,
186+
storage_factory: Option<Arc<dyn StorageFactory>>,
187+
) -> Result<Self> {
173188
let address = config
174189
.address
175190
.as_str()
@@ -194,9 +209,15 @@ impl HmsCatalog {
194209
.build(),
195210
};
196211

197-
let file_io = FileIO::from_path(&config.warehouse)?
212+
let factory = storage_factory.ok_or_else(|| {
213+
Error::new(
214+
ErrorKind::Unexpected,
215+
"StorageFactory must be provided for HmsCatalog. Use `with_storage_factory` to configure it.",
216+
)
217+
})?;
218+
let file_io = FileIOBuilder::new(factory)
198219
.with_props(&config.props)
199-
.build()?;
220+
.build();
200221

201222
Ok(Self {
202223
config,

0 commit comments

Comments
 (0)