diff --git a/Cargo.lock b/Cargo.lock index 60a1f7887..feddccd84 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -624,6 +624,28 @@ dependencies = [ "uuid", ] +[[package]] +name = "aws-sdk-dynamodb" +version = "1.95.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "699bab2e1f9da570071889b674a43df1184b6ec42bf4770ef8758a8f3e02f38f" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand", + "http 0.2.12", + "regex-lite", + "tracing", +] + [[package]] name = "aws-sdk-glue" version = "1.125.0" @@ -2738,7 +2760,35 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c1bd4a6c5d1d85a24214ab8e245f97425b1adc330bbaba2b866d093190aee6f9" dependencies = [ "delta_kernel", + "deltalake-aws", "deltalake-core", + "deltalake-gcp", +] + +[[package]] +name = "deltalake-aws" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8bb6fa32f92bd7393a67f45a27b61c1a6eb90c184e22771f87ee2186d33fc101" +dependencies = [ + "async-trait", + "aws-config", + "aws-credential-types", + "aws-sdk-dynamodb", + "aws-sdk-sts", + "aws-smithy-runtime-api", + "backon", + "bytes", + "chrono", + "deltalake-core", + "futures", + "object_store", + "regex", + "thiserror", + "tokio", + "tracing", + "url", + "uuid", ] [[package]] @@ -2768,6 +2818,7 @@ dependencies = [ "dirs", "either", "futures", + "humantime", "indexmap 2.12.0", "itertools 0.14.0", "num_cpus", @@ -2804,6 +2855,23 @@ dependencies = [ "syn 2.0.108", ] +[[package]] +name = "deltalake-gcp" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8df2708856cfa92e8309141fda1ee8d41e174b0868f26e557d0b2ea8d30fb92e" +dependencies = [ + "async-trait", + "bytes", + "deltalake-core", + "futures", + "object_store", + "thiserror", + "tokio", + "tracing", + "url", +] + [[package]] name = "der" version = "0.7.10" diff --git a/Cargo.toml b/Cargo.toml index d9c629318..c2a191e40 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,7 +40,7 @@ const_format = "0.2" crc32fast = "1" datafusion = "50" datafusion-cli = "50" -deltalake = "0.29" +deltalake = { version = "0.29", features = ["deltalake-aws", "deltalake-gcp"] } fastbloom = "0.14" futures = { version = "0.3", default-features = false } hashbrown = "0.16" diff --git a/src/moonlink/Cargo.toml b/src/moonlink/Cargo.toml index 72dfb779f..ecda77866 100644 --- a/src/moonlink/Cargo.toml +++ b/src/moonlink/Cargo.toml @@ -18,6 +18,7 @@ storage-fs = ["opendal/services-fs", "iceberg/storage-fs"] storage-s3 = [ "opendal/services-s3", "iceberg/storage-s3", + "deltalake/s3", "base64", "hmac", "sha1", diff --git a/src/moonlink/src/storage/mooncake_table/table_creation_test_utils.rs b/src/moonlink/src/storage/mooncake_table/table_creation_test_utils.rs index 8c9f93cf0..0f444e35d 100644 --- a/src/moonlink/src/storage/mooncake_table/table_creation_test_utils.rs +++ b/src/moonlink/src/storage/mooncake_table/table_creation_test_utils.rs @@ -168,6 +168,42 @@ pub(crate) fn create_iceberg_table_config(warehouse_uri: String) -> IcebergTable } } +/// Test util function to create delta table config. +#[allow(unused)] +pub(crate) fn create_delta_table_config(warehouse_uri: String) -> DeltalakeTableConfig { + let accessor_config = if warehouse_uri.starts_with("s3://") { + #[cfg(feature = "storage-s3")] + { + s3_test_utils::create_s3_storage_config(&warehouse_uri) + } + #[cfg(not(feature = "storage-s3"))] + { + panic!("S3 support not enabled. Enable `storage-s3` feature."); + } + } else if warehouse_uri.starts_with("gs://") { + #[cfg(feature = "storage-gcs")] + { + gcs_test_utils::create_gcs_storage_config(&warehouse_uri) + } + #[cfg(not(feature = "storage-gcs"))] + { + panic!("GCS support not enabled. Enable `storage-gcs` feature."); + } + } else { + let storage_config = StorageConfig::FileSystem { + root_directory: warehouse_uri.clone(), + atomic_write_dir: None, + }; + AccessorConfig::new_with_storage_config(storage_config) + }; + + DeltalakeTableConfig { + table_name: DELTA_TEST_TABLE.to_string(), + location: warehouse_uri, + data_accessor_config: accessor_config, + } +} + /// Test util function to create arrow schema. pub(crate) fn create_test_arrow_schema() -> Arc { Arc::new(ArrowSchema::new(vec![ diff --git a/src/moonlink/src/storage/table/deltalake/test.rs b/src/moonlink/src/storage/table/deltalake/test.rs index 6f0ebd37d..7e9d8e39e 100644 --- a/src/moonlink/src/storage/table/deltalake/test.rs +++ b/src/moonlink/src/storage/table/deltalake/test.rs @@ -2,6 +2,13 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; use tempfile::TempDir; +use crate::storage::filesystem::accessor::factory::create_filesystem_accessor; +#[cfg(feature = "storage-s3")] +use crate::storage::filesystem::s3::s3_test_utils; +#[cfg(feature = "storage-s3")] +use crate::storage::filesystem::s3::test_guard::TestGuard as S3TestGuard; +#[cfg(feature = "storage-s3")] +use crate::storage::mooncake_table::table_creation_test_utils::create_delta_table_config; use crate::storage::mooncake_table::table_creation_test_utils::{ create_test_table_metadata, get_delta_table_config, }; @@ -12,17 +19,16 @@ use crate::storage::mooncake_table::{ }; use crate::storage::table::common::table_manager::TableManager; use crate::storage::table::common::table_manager::{PersistenceFileParams, PersistenceResult}; +use crate::storage::table::deltalake::deltalake_table_config::DeltalakeTableConfig; use crate::storage::table::deltalake::deltalake_table_manager::DeltalakeTableManager; -use crate::{create_data_file, FileSystemAccessor, ObjectStorageCache}; +use crate::{create_data_file, ObjectStorageCache}; -#[tokio::test] -async fn test_basic_store_and_load() { +async fn test_basic_store_and_load_impl(delta_table_config: DeltalakeTableConfig) { let temp_dir = TempDir::new().unwrap(); - let table_path = temp_dir.path().to_str().unwrap().to_string(); + let table_path = delta_table_config.location.clone(); let mooncake_table_metadata = create_test_table_metadata(table_path.clone()); - let filesystem_accessor = FileSystemAccessor::default_for_test(&temp_dir); - let delta_table_config = get_delta_table_config(&temp_dir); - + let filesystem_accessor = + create_filesystem_accessor(delta_table_config.data_accessor_config.clone()); let mut delta_table_manager = DeltalakeTableManager::new( mooncake_table_metadata.clone(), Arc::new(ObjectStorageCache::default_for_test(&temp_dir)), // Use independent object storage cache. @@ -137,11 +143,26 @@ async fn test_basic_store_and_load() { assert_eq!(snapshot.disk_files.len(), 1); assert_eq!(snapshot.flush_lsn.unwrap(), flush_lsn); - // Drop table and check. + // Drop the table. delta_table_manager.drop_table().await.unwrap(); - // Explicitly drop the file handle to release the reference count within the unix filesystem. - drop(temp_dir); + // If the delta table lives on local filesystem, it could be still referenced by temp directory variable, so still lives on filesystem. +} + +#[tokio::test] +async fn test_basic_store_and_load() { + let temp_dir = TempDir::new().unwrap(); + let delta_table_config = get_delta_table_config(&temp_dir); + + test_basic_store_and_load_impl(delta_table_config).await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +#[cfg(feature = "storage-s3")] +async fn test_basic_store_and_load_with_s3() { + deltalake::aws::register_handlers(None); + let (bucket, warehouse_uri) = s3_test_utils::get_test_s3_bucket_and_warehouse(); + let _test_guard = S3TestGuard::new(bucket.clone()).await; + let delta_table_config = create_delta_table_config(warehouse_uri); - let dir_exists = tokio::fs::try_exists(table_path).await.unwrap(); - assert!(!dir_exists); + test_basic_store_and_load_impl(delta_table_config).await; } diff --git a/src/moonlink/src/storage/table/deltalake/utils.rs b/src/moonlink/src/storage/table/deltalake/utils.rs index f3c0c4708..8c17e1a29 100644 --- a/src/moonlink/src/storage/table/deltalake/utils.rs +++ b/src/moonlink/src/storage/table/deltalake/utils.rs @@ -1,11 +1,14 @@ use deltalake::kernel::engine::arrow_conversion::TryFromArrow; -use deltalake::{open_table, operations::create::CreateBuilder, DeltaTable}; +use deltalake::open_table_with_storage_options; +use deltalake::{operations::create::CreateBuilder, DeltaTable}; +use std::collections::HashMap; use std::sync::Arc; use url::Url; use crate::storage::filesystem::accessor::base_filesystem_accessor::BaseFileSystemAccess; use crate::storage::mooncake_table::TableMetadata as MooncakeTableMetadata; use crate::storage::table::deltalake::deltalake_table_config::DeltalakeTableConfig; +use crate::storage::StorageConfig as MoonlinkStorgaeConfig; use crate::CacheTrait; use crate::Result; @@ -26,6 +29,37 @@ fn sanitize_deltalake_table_location(location: &str) -> String { } } +/// Get storage option to access deltalake table. +fn get_storage_option(storage_config: &MoonlinkStorgaeConfig) -> HashMap { + #[allow(unused_mut)] + let mut storage_options = HashMap::new(); + + match storage_config { + #[cfg(feature = "storage-s3")] + MoonlinkStorgaeConfig::S3 { + access_key_id, + secret_access_key, + region, + bucket: _, + endpoint, + } => { + storage_options.insert("AWS_ACCESS_KEY_ID".into(), access_key_id.clone()); + storage_options.insert("AWS_SECRET_ACCESS_KEY".into(), secret_access_key.clone()); + storage_options.insert("AWS_REGION".into(), region.clone()); + + if let Some(endpoint) = endpoint { + storage_options.insert("AWS_ENDPOINT_URL".into(), endpoint.clone()); + // Used for MinIO/S3-compatible storage. + storage_options.insert("AWS_ALLOW_HTTP".into(), "true".into()); + storage_options.insert("AWS_S3_ALLOW_UNSAFE_RENAME".into(), "true".into()); + } + } + _ => {} + } + + storage_options +} + /// Get or create a Delta table at the given location. /// /// - If the table doesn't exist → create a new one using the Arrow schema. @@ -38,8 +72,10 @@ pub(crate) async fn get_or_create_deltalake_table( _filesystem_accessor: Arc, config: DeltalakeTableConfig, ) -> Result { + let storage_options = get_storage_option(&config.data_accessor_config.storage_config); let table_location = sanitize_deltalake_table_location(&config.location); - match open_table(Url::parse(&table_location)?).await { + let table_url = Url::parse(&table_location)?; + match open_table_with_storage_options(table_url, storage_options.clone()).await { Ok(existing_table) => Ok(existing_table), Err(_) => { let arrow_schema = mooncake_table_metadata.schema.as_ref(); @@ -51,6 +87,7 @@ pub(crate) async fn get_or_create_deltalake_table( .with_location(config.location.clone()) .with_columns(delta_schema_fields) .with_save_mode(deltalake::protocol::SaveMode::ErrorIfExists) + .with_storage_options(storage_options) .await?; Ok(table) } @@ -75,7 +112,8 @@ pub(crate) async fn get_deltalake_table_if_exists( config: &DeltalakeTableConfig, ) -> Result> { let table_url = get_deltalake_table_url(&config.location)?; - match open_table(table_url).await { + let storage_options = get_storage_option(&config.data_accessor_config.storage_config); + match open_table_with_storage_options(table_url, storage_options).await { Ok(table) => Ok(Some(table)), Err(_) => Ok(None), }