Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ const_format = "0.2"
crc32fast = "1"
datafusion = "50"
datafusion-cli = "50"
deltalake = "0.29"
deltalake = { version = "0.29", features = ["deltalake-aws", "deltalake-gcp"] }
fastbloom = "0.14"
futures = { version = "0.3", default-features = false }
hashbrown = "0.16"
Expand Down
2 changes: 2 additions & 0 deletions src/moonlink/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ storage-fs = ["opendal/services-fs", "iceberg/storage-fs"]
storage-s3 = [
"opendal/services-s3",
"iceberg/storage-s3",
"deltalake/s3",
"base64",
"hmac",
"sha1",
Expand All @@ -29,6 +30,7 @@ storage-gcs = [
"opendal/services-s3",
"iceberg/storage-gcs",
"iceberg/storage-s3",
"deltalake/gcs",
"base64",
"hmac",
"sha1",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,41 @@ pub(crate) fn create_iceberg_table_config(warehouse_uri: String) -> IcebergTable
}
}

/// Test util function to create delta table config.
pub(crate) fn create_delta_table_config(warehouse_uri: String) -> DeltalakeTableConfig {
let accessor_config = if warehouse_uri.starts_with("s3://") {
#[cfg(feature = "storage-s3")]
{
s3_test_utils::create_s3_storage_config(&warehouse_uri)
}
#[cfg(not(feature = "storage-s3"))]
{
panic!("S3 support not enabled. Enable `storage-s3` feature.");
}
} else if warehouse_uri.starts_with("gs://") {
#[cfg(feature = "storage-gcs")]
{
gcs_test_utils::create_gcs_storage_config(&warehouse_uri)
}
#[cfg(not(feature = "storage-gcs"))]
{
panic!("GCS support not enabled. Enable `storage-gcs` feature.");
}
} else {
let storage_config = StorageConfig::FileSystem {
root_directory: warehouse_uri.clone(),
atomic_write_dir: None,
};
AccessorConfig::new_with_storage_config(storage_config)
};

DeltalakeTableConfig {
table_name: DELTA_TEST_TABLE.to_string(),
location: warehouse_uri,
data_accessor_config: accessor_config,
}
}

/// Test util function to create arrow schema.
pub(crate) fn create_test_arrow_schema() -> Arc<ArrowSchema> {
Arc::new(ArrowSchema::new(vec![
Expand Down
39 changes: 32 additions & 7 deletions src/moonlink/src/storage/table/deltalake/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@ use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use tempfile::TempDir;

use crate::storage::filesystem::accessor::factory::create_filesystem_accessor;
#[cfg(feature = "storage-s3")]
use crate::storage::filesystem::s3::s3_test_utils;
#[cfg(feature = "storage-s3")]
use crate::storage::filesystem::s3::test_guard::TestGuard as S3TestGuard;
#[cfg(feature = "storage-s3")]
use crate::storage::mooncake_table::table_creation_test_utils::create_delta_table_config;
use crate::storage::mooncake_table::table_creation_test_utils::{
create_test_table_metadata, get_delta_table_config,
};
Expand All @@ -12,17 +19,16 @@ use crate::storage::mooncake_table::{
};
use crate::storage::table::common::table_manager::TableManager;
use crate::storage::table::common::table_manager::{PersistenceFileParams, PersistenceResult};
use crate::storage::table::deltalake::deltalake_table_config::DeltalakeTableConfig;
use crate::storage::table::deltalake::deltalake_table_manager::DeltalakeTableManager;
use crate::{create_data_file, FileSystemAccessor, ObjectStorageCache};
use crate::{create_data_file, ObjectStorageCache};

#[tokio::test]
async fn test_basic_store_and_load() {
async fn test_basic_store_and_load_impl(delta_table_config: DeltalakeTableConfig) {
let temp_dir = TempDir::new().unwrap();
let table_path = temp_dir.path().to_str().unwrap().to_string();
let table_path = delta_table_config.location.clone();
let mooncake_table_metadata = create_test_table_metadata(table_path.clone());
let filesystem_accessor = FileSystemAccessor::default_for_test(&temp_dir);
let delta_table_config = get_delta_table_config(&temp_dir);

let filesystem_accessor =
create_filesystem_accessor(delta_table_config.data_accessor_config.clone());
let mut delta_table_manager = DeltalakeTableManager::new(
mooncake_table_metadata.clone(),
Arc::new(ObjectStorageCache::default_for_test(&temp_dir)), // Use independent object storage cache.
Expand Down Expand Up @@ -145,3 +151,22 @@ async fn test_basic_store_and_load() {
let dir_exists = tokio::fs::try_exists(table_path).await.unwrap();
assert!(!dir_exists);
}

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: S3 URI Check Fails in Local Filesystem Function

The test_basic_store_and_load_impl function uses tokio::fs::try_exists(table_path) to verify table deletion. This check fails for S3 test cases because table_path is an S3 URI, and tokio::fs::try_exists only works with local filesystem paths.

Fix in Cursor Fix in Web

#[tokio::test]
async fn test_basic_store_and_load() {
let temp_dir = TempDir::new().unwrap();
let delta_table_config = get_delta_table_config(&temp_dir);

test_basic_store_and_load_impl(delta_table_config).await;
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[cfg(feature = "storage-s3")]
async fn test_basic_store_and_load_with_s3() {
deltalake::aws::register_handlers(None);
let (bucket, warehouse_uri) = s3_test_utils::get_test_s3_bucket_and_warehouse();
let _test_guard = S3TestGuard::new(bucket.clone()).await;
let delta_table_config = create_delta_table_config(warehouse_uri);

test_basic_store_and_load_impl(delta_table_config).await;
}
44 changes: 41 additions & 3 deletions src/moonlink/src/storage/table/deltalake/utils.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
use deltalake::kernel::engine::arrow_conversion::TryFromArrow;
use deltalake::{open_table, operations::create::CreateBuilder, DeltaTable};
use deltalake::open_table_with_storage_options;
use deltalake::{operations::create::CreateBuilder, DeltaTable};
use std::collections::HashMap;
use std::sync::Arc;
use url::Url;

use crate::storage::filesystem::accessor::base_filesystem_accessor::BaseFileSystemAccess;
use crate::storage::mooncake_table::TableMetadata as MooncakeTableMetadata;
use crate::storage::table::deltalake::deltalake_table_config::DeltalakeTableConfig;
use crate::storage::StorageConfig as MoonlinkStorgaeConfig;
use crate::CacheTrait;
use crate::Result;

Expand All @@ -26,6 +29,37 @@ fn sanitize_deltalake_table_location(location: &str) -> String {
}
}

/// Get storage option to access deltalake table.
fn get_storage_option(storage_config: &MoonlinkStorgaeConfig) -> HashMap<String, String> {
#[allow(unused_mut)]
let mut storage_options = HashMap::new();

match storage_config {
#[cfg(feature = "storage-s3")]
MoonlinkStorgaeConfig::S3 {
access_key_id,
secret_access_key,
region,
bucket: _,
endpoint,
} => {
storage_options.insert("AWS_ACCESS_KEY_ID".into(), access_key_id.clone());
storage_options.insert("AWS_SECRET_ACCESS_KEY".into(), secret_access_key.clone());
storage_options.insert("AWS_REGION".into(), region.clone());

if let Some(endpoint) = endpoint {
storage_options.insert("AWS_ENDPOINT_URL".into(), endpoint.clone());
// Used for MinIO/S3-compatible storage.
storage_options.insert("AWS_ALLOW_HTTP".into(), "true".into());
storage_options.insert("AWS_S3_ALLOW_UNSAFE_RENAME".into(), "true".into());
}
}
_ => {}
}

storage_options
}

/// Get or create a Delta table at the given location.
///
/// - If the table doesn't exist → create a new one using the Arrow schema.
Expand All @@ -38,8 +72,10 @@ pub(crate) async fn get_or_create_deltalake_table(
_filesystem_accessor: Arc<dyn BaseFileSystemAccess>,
config: DeltalakeTableConfig,
) -> Result<DeltaTable> {
let storage_options = get_storage_option(&config.data_accessor_config.storage_config);
let table_location = sanitize_deltalake_table_location(&config.location);
match open_table(Url::parse(&table_location)?).await {
let table_url = Url::parse(&table_location)?;
match open_table_with_storage_options(table_url, storage_options.clone()).await {
Ok(existing_table) => Ok(existing_table),
Err(_) => {
let arrow_schema = mooncake_table_metadata.schema.as_ref();
Expand All @@ -51,6 +87,7 @@ pub(crate) async fn get_or_create_deltalake_table(
.with_location(config.location.clone())
.with_columns(delta_schema_fields)
.with_save_mode(deltalake::protocol::SaveMode::ErrorIfExists)
.with_storage_options(storage_options)
.await?;
Ok(table)
}
Expand All @@ -75,7 +112,8 @@ pub(crate) async fn get_deltalake_table_if_exists(
config: &DeltalakeTableConfig,
) -> Result<Option<DeltaTable>> {
let table_url = get_deltalake_table_url(&config.location)?;
match open_table(table_url).await {
let storage_options = get_storage_option(&config.data_accessor_config.storage_config);
match open_table_with_storage_options(table_url, storage_options).await {
Ok(table) => Ok(Some(table)),
Err(_) => Ok(None),
}
Expand Down
Loading