Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions crates/aws/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,34 +29,33 @@ uuid = { workspace = true, features = ["serde", "v4"] }
url = { workspace = true }

# crates.io dependencies
aws-smithy-runtime-api = { version = "1.7" }
aws-smithy-runtime = { version = "1.7", optional = true }
aws-smithy-runtime-api = { version = "1.8" }
aws-smithy-runtime = { version = "1.8", optional = true }
aws-credential-types = { version = "1.2", features = ["hardcoded-credentials"] }
aws-config = { version = "1.5", default-features = false, features = [
aws-config = { version = "1.8", default-features = false, features = [
"behavior-version-latest",
"rt-tokio",
"credentials-process",
] }
aws-sdk-dynamodb = { version = "=1.79.0", default-features = false, features = [
aws-sdk-dynamodb = { version = "1.93.0", default-features = false, features = [
"behavior-version-latest",
"rt-tokio",
] }
aws-sdk-sts = { version = "=1.73.0", default-features = false, features = [
aws-sdk-sts = { version = "1.86.0", default-features = false, features = [
"behavior-version-latest",
"rt-tokio",
] }
backon = { version = "1", default-features = false, features = ["tokio-sleep"] }
hyper-tls = { version = "0.5", optional = true }
maplit = "1"

[dev-dependencies]
deltalake-core = { path = "../core" }
chrono = { workspace = true }
serial_test = "3"
deltalake-core = { path = "../core" }
deltalake-test = { path = "../test" }
pretty_env_logger = "0.5.0"
rand = "0.8"
serde_json = { workspace = true }
serial_test = "3"

[features]
default = ["rustls"]
Expand Down
80 changes: 52 additions & 28 deletions crates/aws/src/credentials.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,6 @@ impl ProvideCredentials for OptionsCredentialsProvider {
#[cfg(test)]
mod options_tests {
use super::*;
use maplit::hashmap;

#[test]
fn test_empty_options_error() {
Expand All @@ -181,10 +180,10 @@ mod options_tests {

#[test]
fn test_uppercase_options_resolve() {
let options = hashmap! {
"AWS_ACCESS_KEY_ID".into() => "key".into(),
"AWS_SECRET_ACCESS_KEY".into() => "secret".into(),
};
let options = HashMap::from([
("AWS_ACCESS_KEY_ID".into(), "key".into()),
("AWS_SECRET_ACCESS_KEY".into(), "secret".into()),
]);
let provider = OptionsCredentialsProvider { options };
let result = provider.credentials();
assert!(result.is_ok(), "StorageOptions with at least AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY should resolve");
Expand All @@ -195,10 +194,10 @@ mod options_tests {

#[test]
fn test_lowercase_options_resolve() {
let options = hashmap! {
"aws_access_key_id".into() => "key".into(),
"aws_secret_access_key".into() => "secret".into(),
};
let options = HashMap::from([
("AWS_ACCESS_KEY_ID".into(), "key".into()),
("AWS_SECRET_ACCESS_KEY".into(), "secret".into()),
]);
let provider = OptionsCredentialsProvider { options };
let result = provider.credentials();
assert!(result.is_ok(), "StorageOptions with at least AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY should resolve");
Expand Down Expand Up @@ -295,17 +294,21 @@ pub async fn resolve_credentials(options: &HashMap<String, String>) -> DeltaResu
#[cfg(test)]
mod tests {
use super::*;
use crate::constants;
use maplit::hashmap;
use serial_test::serial;

#[tokio::test]
#[serial]
async fn test_options_credentials_provider() {
let options = hashmap! {
constants::AWS_ACCESS_KEY_ID.to_string() => "test_id".to_string(),
constants::AWS_SECRET_ACCESS_KEY.to_string() => "test_secret".to_string(),
};
let options = HashMap::from([
(
constants::AWS_ACCESS_KEY_ID.to_string(),
"test_id".to_string(),
),
(
constants::AWS_SECRET_ACCESS_KEY.to_string(),
"test_secret".to_string(),
),
]);

let config = resolve_credentials(&options).await;
assert!(config.is_ok(), "{config:?}");
Expand Down Expand Up @@ -334,11 +337,20 @@ mod tests {
#[tokio::test]
#[serial]
async fn test_options_credentials_provider_session_token() {
let options = hashmap! {
constants::AWS_ACCESS_KEY_ID.to_string() => "test_id".to_string(),
constants::AWS_SECRET_ACCESS_KEY.to_string() => "test_secret".to_string(),
constants::AWS_SESSION_TOKEN.to_string() => "test_token".to_string(),
};
let options = HashMap::from([
(
constants::AWS_ACCESS_KEY_ID.to_string(),
"test_id".to_string(),
),
(
constants::AWS_SECRET_ACCESS_KEY.to_string(),
"test_secret".to_string(),
),
(
constants::AWS_SESSION_TOKEN.to_string(),
"test_token".to_string(),
),
]);

let config = resolve_credentials(&options)
.await
Expand All @@ -362,10 +374,16 @@ mod tests {
#[tokio::test]
#[serial]
async fn test_object_store_credential_provider() -> DeltaResult<()> {
let options = hashmap! {
constants::AWS_ACCESS_KEY_ID.to_string() => "test_id".to_string(),
constants::AWS_SECRET_ACCESS_KEY.to_string() => "test_secret".to_string(),
};
let options = HashMap::from([
(
constants::AWS_ACCESS_KEY_ID.to_string(),
"test_id".to_string(),
),
(
constants::AWS_SECRET_ACCESS_KEY.to_string(),
"test_secret".to_string(),
),
]);
let sdk_config = resolve_credentials(&options)
.await
.expect("Failed to resolve credentials for the test");
Expand All @@ -386,10 +404,16 @@ mod tests {
#[tokio::test]
#[serial]
async fn test_object_store_credential_provider_consistency() -> DeltaResult<()> {
let options = hashmap! {
constants::AWS_ACCESS_KEY_ID.to_string() => "test_id".to_string(),
constants::AWS_SECRET_ACCESS_KEY.to_string() => "test_secret".to_string(),
};
let options = HashMap::from([
(
constants::AWS_ACCESS_KEY_ID.to_string(),
"test_id".to_string(),
),
(
constants::AWS_SECRET_ACCESS_KEY.to_string(),
"test_secret".to_string(),
),
]);
let sdk_config = resolve_credentials(&options)
.await
.expect("Failed to resolve credentijals for the test");
Expand Down
63 changes: 42 additions & 21 deletions crates/aws/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,10 +324,16 @@ impl DynamoDbLockClient {
}

fn get_primary_key(&self, version: i64, table_path: &str) -> HashMap<String, AttributeValue> {
maplit::hashmap! {
constants::ATTR_TABLE_PATH.to_owned() => string_attr(table_path),
constants::ATTR_FILE_NAME.to_owned() => string_attr(format!("{version:020}.json")),
}
HashMap::from([
(
constants::ATTR_TABLE_PATH.to_owned(),
string_attr(table_path),
),
(
constants::ATTR_FILE_NAME.to_owned(),
string_attr(format!("{version:020}.json")),
),
])
}

/// Read a log entry from DynamoDb.
Expand Down Expand Up @@ -434,9 +440,10 @@ impl DynamoDbLockClient {
.limit(limit.try_into().unwrap_or(i32::MAX))
.scan_index_forward(false)
.key_condition_expression(format!("{} = :tn", constants::ATTR_TABLE_PATH))
.set_expression_attribute_values(Some(
maplit::hashmap!(":tn".into() => string_attr(table_path)),
))
.set_expression_attribute_values(Some(HashMap::from([(
":tn".into(),
string_attr(table_path),
)])))
.send()
.await
},
Expand Down Expand Up @@ -483,11 +490,11 @@ impl DynamoDbLockClient {
.table_name(self.get_lock_table_name())
.set_key(Some(self.get_primary_key(version, table_path)))
.update_expression("SET complete = :c, expireTime = :e".to_owned())
.set_expression_attribute_values(Some(maplit::hashmap! {
":c".to_owned() => string_attr("true"),
":e".to_owned() => num_attr(seconds_since_epoch),
":f".into() => string_attr("false"),
}))
.set_expression_attribute_values(Some(HashMap::from([
(":c".to_owned(), string_attr("true")),
(":e".to_owned(), num_attr(seconds_since_epoch)),
(":f".into(), string_attr("false")),
])))
.condition_expression(constants::CONDITION_UPDATE_INCOMPLETE)
.send()
.await?;
Expand Down Expand Up @@ -529,9 +536,10 @@ impl DynamoDbLockClient {
.delete_item()
.table_name(self.get_lock_table_name())
.set_key(Some(self.get_primary_key(version, table_path)))
.set_expression_attribute_values(Some(maplit::hashmap! {
":f".into() => string_attr("false"),
}))
.set_expression_attribute_values(Some(HashMap::from([(
":f".into(),
string_attr("false"),
)])))
.condition_expression(constants::CONDITION_DELETE_INCOMPLETE.as_str())
.send()
.await?;
Expand Down Expand Up @@ -627,12 +635,25 @@ fn create_value_map(
) -> HashMap<String, AttributeValue> {
// cut off `_delta_log` part: temp_path in DynamoDb is relative to `_delta_log` not table root.
let temp_path = Path::from_iter(commit_entry.temp_path.parts().skip(1));
let mut value_map = maplit::hashmap! {
constants::ATTR_TABLE_PATH.to_owned() => string_attr(table_path),
constants::ATTR_FILE_NAME.to_owned() => string_attr(format!("{:020}.json", commit_entry.version)),
constants::ATTR_TEMP_PATH.to_owned() => string_attr(temp_path),
constants::ATTR_COMPLETE.to_owned() => string_attr(if commit_entry.complete { "true" } else { "false" }),
};
let mut value_map = HashMap::from([
(
constants::ATTR_TABLE_PATH.to_owned(),
string_attr(table_path),
),
(
constants::ATTR_FILE_NAME.to_owned(),
string_attr(format!("{:020}.json", commit_entry.version)),
),
(constants::ATTR_TEMP_PATH.to_owned(), string_attr(temp_path)),
(
constants::ATTR_COMPLETE.to_owned(),
string_attr(if commit_entry.complete {
"true"
} else {
"false"
}),
),
]);
commit_entry.expire_time.as_ref().map(|t| {
value_map.insert(
constants::ATTR_EXPIRE_TIME.to_owned(),
Expand Down
Loading
Loading