Skip to content

Commit 97f9bd0

Browse files
authored
[5/N] [storage] Support S3 for deltalake access (#2189)
1 parent c80de59 commit 97f9bd0

File tree

6 files changed

+180
-16
lines changed

6 files changed

+180
-16
lines changed

Cargo.lock

Lines changed: 68 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ const_format = "0.2"
4040
crc32fast = "1"
4141
datafusion = "50"
4242
datafusion-cli = "50"
43-
deltalake = "0.29"
43+
deltalake = { version = "0.29", features = ["deltalake-aws", "deltalake-gcp"] }
4444
fastbloom = "0.14"
4545
futures = { version = "0.3", default-features = false }
4646
hashbrown = "0.16"

src/moonlink/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ storage-fs = ["opendal/services-fs", "iceberg/storage-fs"]
1818
storage-s3 = [
1919
"opendal/services-s3",
2020
"iceberg/storage-s3",
21+
"deltalake/s3",
2122
"base64",
2223
"hmac",
2324
"sha1",

src/moonlink/src/storage/mooncake_table/table_creation_test_utils.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,42 @@ pub(crate) fn create_iceberg_table_config(warehouse_uri: String) -> IcebergTable
168168
}
169169
}
170170

171+
/// Test util function to create delta table config.
172+
#[allow(unused)]
173+
pub(crate) fn create_delta_table_config(warehouse_uri: String) -> DeltalakeTableConfig {
174+
let accessor_config = if warehouse_uri.starts_with("s3://") {
175+
#[cfg(feature = "storage-s3")]
176+
{
177+
s3_test_utils::create_s3_storage_config(&warehouse_uri)
178+
}
179+
#[cfg(not(feature = "storage-s3"))]
180+
{
181+
panic!("S3 support not enabled. Enable `storage-s3` feature.");
182+
}
183+
} else if warehouse_uri.starts_with("gs://") {
184+
#[cfg(feature = "storage-gcs")]
185+
{
186+
gcs_test_utils::create_gcs_storage_config(&warehouse_uri)
187+
}
188+
#[cfg(not(feature = "storage-gcs"))]
189+
{
190+
panic!("GCS support not enabled. Enable `storage-gcs` feature.");
191+
}
192+
} else {
193+
let storage_config = StorageConfig::FileSystem {
194+
root_directory: warehouse_uri.clone(),
195+
atomic_write_dir: None,
196+
};
197+
AccessorConfig::new_with_storage_config(storage_config)
198+
};
199+
200+
DeltalakeTableConfig {
201+
table_name: DELTA_TEST_TABLE.to_string(),
202+
location: warehouse_uri,
203+
data_accessor_config: accessor_config,
204+
}
205+
}
206+
171207
/// Test util function to create arrow schema.
172208
pub(crate) fn create_test_arrow_schema() -> Arc<ArrowSchema> {
173209
Arc::new(ArrowSchema::new(vec![

src/moonlink/src/storage/table/deltalake/test.rs

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,13 @@ use std::collections::{HashMap, HashSet};
22
use std::sync::Arc;
33
use tempfile::TempDir;
44

5+
use crate::storage::filesystem::accessor::factory::create_filesystem_accessor;
6+
#[cfg(feature = "storage-s3")]
7+
use crate::storage::filesystem::s3::s3_test_utils;
8+
#[cfg(feature = "storage-s3")]
9+
use crate::storage::filesystem::s3::test_guard::TestGuard as S3TestGuard;
10+
#[cfg(feature = "storage-s3")]
11+
use crate::storage::mooncake_table::table_creation_test_utils::create_delta_table_config;
512
use crate::storage::mooncake_table::table_creation_test_utils::{
613
create_test_table_metadata, get_delta_table_config,
714
};
@@ -12,17 +19,16 @@ use crate::storage::mooncake_table::{
1219
};
1320
use crate::storage::table::common::table_manager::TableManager;
1421
use crate::storage::table::common::table_manager::{PersistenceFileParams, PersistenceResult};
22+
use crate::storage::table::deltalake::deltalake_table_config::DeltalakeTableConfig;
1523
use crate::storage::table::deltalake::deltalake_table_manager::DeltalakeTableManager;
16-
use crate::{create_data_file, FileSystemAccessor, ObjectStorageCache};
24+
use crate::{create_data_file, ObjectStorageCache};
1725

18-
#[tokio::test]
19-
async fn test_basic_store_and_load() {
26+
async fn test_basic_store_and_load_impl(delta_table_config: DeltalakeTableConfig) {
2027
let temp_dir = TempDir::new().unwrap();
21-
let table_path = temp_dir.path().to_str().unwrap().to_string();
28+
let table_path = delta_table_config.location.clone();
2229
let mooncake_table_metadata = create_test_table_metadata(table_path.clone());
23-
let filesystem_accessor = FileSystemAccessor::default_for_test(&temp_dir);
24-
let delta_table_config = get_delta_table_config(&temp_dir);
25-
30+
let filesystem_accessor =
31+
create_filesystem_accessor(delta_table_config.data_accessor_config.clone());
2632
let mut delta_table_manager = DeltalakeTableManager::new(
2733
mooncake_table_metadata.clone(),
2834
Arc::new(ObjectStorageCache::default_for_test(&temp_dir)), // Use independent object storage cache.
@@ -137,11 +143,26 @@ async fn test_basic_store_and_load() {
137143
assert_eq!(snapshot.disk_files.len(), 1);
138144
assert_eq!(snapshot.flush_lsn.unwrap(), flush_lsn);
139145

140-
// Drop table and check.
146+
// Drop the table.
141147
delta_table_manager.drop_table().await.unwrap();
142-
// Explicitly drop the file handle to release the reference count within the unix filesystem.
143-
drop(temp_dir);
148+
// If the delta table lives on local filesystem, it could be still referenced by temp directory variable, so still lives on filesystem.
149+
}
150+
151+
#[tokio::test]
152+
async fn test_basic_store_and_load() {
153+
let temp_dir = TempDir::new().unwrap();
154+
let delta_table_config = get_delta_table_config(&temp_dir);
155+
156+
test_basic_store_and_load_impl(delta_table_config).await;
157+
}
158+
159+
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
160+
#[cfg(feature = "storage-s3")]
161+
async fn test_basic_store_and_load_with_s3() {
162+
deltalake::aws::register_handlers(None);
163+
let (bucket, warehouse_uri) = s3_test_utils::get_test_s3_bucket_and_warehouse();
164+
let _test_guard = S3TestGuard::new(bucket.clone()).await;
165+
let delta_table_config = create_delta_table_config(warehouse_uri);
144166

145-
let dir_exists = tokio::fs::try_exists(table_path).await.unwrap();
146-
assert!(!dir_exists);
167+
test_basic_store_and_load_impl(delta_table_config).await;
147168
}

src/moonlink/src/storage/table/deltalake/utils.rs

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
use deltalake::kernel::engine::arrow_conversion::TryFromArrow;
2-
use deltalake::{open_table, operations::create::CreateBuilder, DeltaTable};
2+
use deltalake::open_table_with_storage_options;
3+
use deltalake::{operations::create::CreateBuilder, DeltaTable};
4+
use std::collections::HashMap;
35
use std::sync::Arc;
46
use url::Url;
57

68
use crate::storage::filesystem::accessor::base_filesystem_accessor::BaseFileSystemAccess;
79
use crate::storage::mooncake_table::TableMetadata as MooncakeTableMetadata;
810
use crate::storage::table::deltalake::deltalake_table_config::DeltalakeTableConfig;
11+
use crate::storage::StorageConfig as MoonlinkStorgaeConfig;
912
use crate::CacheTrait;
1013
use crate::Result;
1114

@@ -26,6 +29,37 @@ fn sanitize_deltalake_table_location(location: &str) -> String {
2629
}
2730
}
2831

32+
/// Get storage option to access deltalake table.
33+
fn get_storage_option(storage_config: &MoonlinkStorgaeConfig) -> HashMap<String, String> {
34+
#[allow(unused_mut)]
35+
let mut storage_options = HashMap::new();
36+
37+
match storage_config {
38+
#[cfg(feature = "storage-s3")]
39+
MoonlinkStorgaeConfig::S3 {
40+
access_key_id,
41+
secret_access_key,
42+
region,
43+
bucket: _,
44+
endpoint,
45+
} => {
46+
storage_options.insert("AWS_ACCESS_KEY_ID".into(), access_key_id.clone());
47+
storage_options.insert("AWS_SECRET_ACCESS_KEY".into(), secret_access_key.clone());
48+
storage_options.insert("AWS_REGION".into(), region.clone());
49+
50+
if let Some(endpoint) = endpoint {
51+
storage_options.insert("AWS_ENDPOINT_URL".into(), endpoint.clone());
52+
// Used for MinIO/S3-compatible storage.
53+
storage_options.insert("AWS_ALLOW_HTTP".into(), "true".into());
54+
storage_options.insert("AWS_S3_ALLOW_UNSAFE_RENAME".into(), "true".into());
55+
}
56+
}
57+
_ => {}
58+
}
59+
60+
storage_options
61+
}
62+
2963
/// Get or create a Delta table at the given location.
3064
///
3165
/// - If the table doesn't exist → create a new one using the Arrow schema.
@@ -38,8 +72,10 @@ pub(crate) async fn get_or_create_deltalake_table(
3872
_filesystem_accessor: Arc<dyn BaseFileSystemAccess>,
3973
config: DeltalakeTableConfig,
4074
) -> Result<DeltaTable> {
75+
let storage_options = get_storage_option(&config.data_accessor_config.storage_config);
4176
let table_location = sanitize_deltalake_table_location(&config.location);
42-
match open_table(Url::parse(&table_location)?).await {
77+
let table_url = Url::parse(&table_location)?;
78+
match open_table_with_storage_options(table_url, storage_options.clone()).await {
4379
Ok(existing_table) => Ok(existing_table),
4480
Err(_) => {
4581
let arrow_schema = mooncake_table_metadata.schema.as_ref();
@@ -51,6 +87,7 @@ pub(crate) async fn get_or_create_deltalake_table(
5187
.with_location(config.location.clone())
5288
.with_columns(delta_schema_fields)
5389
.with_save_mode(deltalake::protocol::SaveMode::ErrorIfExists)
90+
.with_storage_options(storage_options)
5491
.await?;
5592
Ok(table)
5693
}
@@ -75,7 +112,8 @@ pub(crate) async fn get_deltalake_table_if_exists(
75112
config: &DeltalakeTableConfig,
76113
) -> Result<Option<DeltaTable>> {
77114
let table_url = get_deltalake_table_url(&config.location)?;
78-
match open_table(table_url).await {
115+
let storage_options = get_storage_option(&config.data_accessor_config.storage_config);
116+
match open_table_with_storage_options(table_url, storage_options).await {
79117
Ok(table) => Ok(Some(table)),
80118
Err(_) => Ok(None),
81119
}

0 commit comments

Comments
 (0)