Skip to content
Merged
12 changes: 12 additions & 0 deletions .changelog/token_bucket_time_source.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
---
applies_to: ["client"]
authors: ["lnj"]
references: ["smithy-rs#4459"]
breaking: false
new_feature: false
bug_fix: true
---

Updated the `TokenBucket` creation to initialize the bucket with the user-provided `TimeSource` from the `Config`.
This fixes the bug in [issue 4459](https://github.com/smithy-lang/smithy-rs/issues/4459) that caused failures
in WASM since the TokenBucket was being created with a default `SystemTime` based `TimeSource`
83 changes: 83 additions & 0 deletions aws/sdk/integration-tests/s3/tests/token_bucket_time_source.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

#![cfg(feature = "test-util")]

use aws_sdk_s3::{config::Region, Client, Config};
use aws_smithy_async::test_util::ManualTimeSource;
use aws_smithy_async::time::SharedTimeSource;
use aws_smithy_http_client::test_util::{ReplayEvent, StaticReplayClient};
use aws_smithy_runtime::client::retries::TokenBucket;
use aws_smithy_runtime_api::box_error::BoxError;
use aws_smithy_runtime_api::client::interceptors::context::BeforeTransmitInterceptorContextMut;
use aws_smithy_runtime_api::client::interceptors::Intercept;
use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents;
use aws_smithy_types::body::SdkBody;
use aws_smithy_types::config_bag::ConfigBag;
use std::sync::LazyLock;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

static THE_TIME: LazyLock<SystemTime> =
LazyLock::new(|| UNIX_EPOCH + Duration::from_secs(12344321));

#[derive(Debug)]
struct TimeSourceValidationInterceptor;

impl Intercept for TimeSourceValidationInterceptor {
fn name(&self) -> &'static str {
"TimeSourceValidationInterceptor"
}

fn modify_before_transmit(
&self,
_context: &mut BeforeTransmitInterceptorContextMut<'_>,
_runtime_components: &RuntimeComponents,
cfg: &mut ConfigBag,
) -> Result<(), BoxError> {
if let Some(token_bucket) = cfg.load::<TokenBucket>() {
let token_bucket_time_source = token_bucket.time_source();
let token_time = token_bucket_time_source.now();

assert_eq!(
*THE_TIME, token_time,
"Token source should match the configured time source"
);
}
Ok(())
}
}

#[tokio::test]
async fn test_token_bucket_gets_time_source_from_config() {
let time_source = ManualTimeSource::new(*THE_TIME);
let shared_time_source = SharedTimeSource::new(time_source);

let http_client = StaticReplayClient::new(vec![ReplayEvent::new(
http_1x::Request::builder()
.uri("https://www.doesntmatter.com")
.body(SdkBody::empty())
.unwrap(),
http_1x::Response::builder()
.status(200)
.body(SdkBody::from("<ListBucketResult></ListBucketResult>"))
.unwrap(),
)]);

let config = Config::builder()
.region(Region::new("us-east-1"))
.http_client(http_client)
.time_source(shared_time_source)
.interceptor(TimeSourceValidationInterceptor)
.build();

let client = Client::from_conf(config);

let _result = client
.list_objects_v2()
.bucket("test-bucket")
.send()
.await
.unwrap();
}
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ private fun baseClientRuntimePluginsFn(
#{DefaultPluginParams}::new()
.with_retry_partition_name(default_retry_partition)
.with_behavior_version(config.behavior_version.expect(${behaviorVersionError.dq()}))
.with_time_source(config.runtime_components.time_source().unwrap_or_default())
))
// user config
.with_client_plugin(
Expand Down
69 changes: 62 additions & 7 deletions rust-runtime/aws-smithy-runtime/src/client/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::client::retries::strategy::standard::TokenBucketProvider;
use crate::client::retries::strategy::StandardRetryStrategy;
use crate::client::retries::RetryPartition;
use aws_smithy_async::rt::sleep::default_async_sleep;
use aws_smithy_async::time::SystemTimeSource;
use aws_smithy_async::time::{SharedTimeSource, SystemTimeSource, TimeSource};
use aws_smithy_runtime_api::box_error::BoxError;
use aws_smithy_runtime_api::client::behavior_version::BehaviorVersion;
use aws_smithy_runtime_api::client::http::SharedHttpClient;
Expand Down Expand Up @@ -126,6 +126,7 @@ pub fn default_time_source_plugin() -> Option<SharedRuntimePlugin> {
}

/// Runtime plugin that sets the default retry strategy, config (disabled), and partition.
#[deprecated = "Use default_retry_config_plugin_v2 to get a TokenBucket that respects the user provided TimeSource."]
pub fn default_retry_config_plugin(
default_partition_name: impl Into<Cow<'static, str>>,
) -> Option<SharedRuntimePlugin> {
Expand All @@ -137,7 +138,43 @@ pub fn default_retry_config_plugin(
.with_config_validator(SharedConfigValidator::base_client_config_fn(
validate_retry_config,
))
.with_interceptor(TokenBucketProvider::new(retry_partition.clone()))
.with_interceptor(TokenBucketProvider::new(
retry_partition.clone(),
SharedTimeSource::default(), // Replicates previous behavior
))
})
.with_config(layer("default_retry_config", |layer| {
layer.store_put(RetryConfig::disabled());
layer.store_put(retry_partition);
}))
.into_shared(),
)
}

/// Runtime plugin that sets the default retry strategy, config (disabled), and partition.
pub fn default_retry_config_plugin_v2(
default_plugin_params: &DefaultPluginParams,
) -> Option<SharedRuntimePlugin> {
let retry_partition = RetryPartition::new(
default_plugin_params
.retry_partition_name()
.clone()
.expect("retry_partition_name is required"),
);
Some(
default_plugin("default_retry_config_plugin", |components| {
components
.with_retry_strategy(Some(StandardRetryStrategy::new()))
.with_config_validator(SharedConfigValidator::base_client_config_fn(
validate_retry_config,
))
.with_interceptor(TokenBucketProvider::new(
retry_partition.clone(),
default_plugin_params
.time_source
.clone()
.unwrap_or_default(),
))
})
.with_config(layer("default_retry_config", |layer| {
layer.store_put(RetryConfig::disabled());
Expand Down Expand Up @@ -288,6 +325,7 @@ fn validate_stalled_stream_protection_config(
pub struct DefaultPluginParams {
retry_partition_name: Option<Cow<'static, str>>,
behavior_version: Option<BehaviorVersion>,
time_source: Option<SharedTimeSource>,
}

impl DefaultPluginParams {
Expand All @@ -302,11 +340,32 @@ impl DefaultPluginParams {
self
}

/// Gets the retry partition name.
pub fn retry_partition_name(&self) -> &Option<Cow<'static, str>> {
&self.retry_partition_name
}

/// Sets the behavior major version.
pub fn with_behavior_version(mut self, version: BehaviorVersion) -> Self {
self.behavior_version = Some(version);
self
}

/// Gets the behavior major version.
pub fn behavior_version(&self) -> &Option<BehaviorVersion> {
&self.behavior_version
}

/// Sets the time_source.
pub fn with_time_source(mut self, time_source: impl TimeSource + 'static) -> Self {
self.time_source = Some(SharedTimeSource::new(time_source));
self
}

/// Gets the time_source.
pub fn time_source(&self) -> &Option<SharedTimeSource> {
&self.time_source
}
}

/// All default plugins.
Expand All @@ -320,11 +379,7 @@ pub fn default_plugins(
[
default_http_client_plugin_v2(behavior_version),
default_identity_cache_plugin(),
default_retry_config_plugin(
params
.retry_partition_name
.expect("retry_partition_name is required"),
),
default_retry_config_plugin_v2(&params),
default_sleep_impl_plugin(),
default_time_source_plugin(),
default_timeout_config_plugin(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use std::sync::Mutex;
use std::time::{Duration, SystemTime};

use aws_smithy_async::time::SharedTimeSource;
use tokio::sync::OwnedSemaphorePermit;
use tracing::{debug, trace};

Expand Down Expand Up @@ -371,8 +372,12 @@ impl TokenBucketProvider {
///
/// NOTE: This partition should be the one used for every operation on a client
/// unless config is overridden.
pub(crate) fn new(default_partition: RetryPartition) -> Self {
let token_bucket = TOKEN_BUCKET.get_or_init_default(default_partition.clone());
pub(crate) fn new(default_partition: RetryPartition, time_source: SharedTimeSource) -> Self {
let token_bucket = TOKEN_BUCKET.get_or_init(default_partition.clone(), || {
let mut tb = TokenBucket::default();
tb.update_time_source(time_source);
tb
});
Self {
default_partition,
token_bucket,
Expand All @@ -388,7 +393,7 @@ impl Intercept for TokenBucketProvider {
fn modify_before_retry_loop(
&self,
_context: &mut BeforeTransmitInterceptorContextMut<'_>,
_runtime_components: &RuntimeComponents,
runtime_components: &RuntimeComponents,
cfg: &mut ConfigBag,
) -> Result<(), BoxError> {
let retry_partition = cfg.load::<RetryPartition>().expect("set in default config");
Expand All @@ -402,7 +407,11 @@ impl Intercept for TokenBucketProvider {
// avoid contention on the global lock
self.token_bucket.clone()
} else {
TOKEN_BUCKET.get_or_init_default(retry_partition.clone())
TOKEN_BUCKET.get_or_init(retry_partition.clone(), || {
let mut tb = TokenBucket::default();
tb.update_time_source(runtime_components.time_source().unwrap_or_default());
tb
})
}
}
RetryPartitionInner::Custom { token_bucket, .. } => token_bucket.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,19 @@ impl TokenBucket {
pub(crate) fn available_permits(&self) -> usize {
self.semaphore.available_permits()
}

// Allows us to create a default client but still update the time_source
pub(crate) fn update_time_source(&mut self, new_time_source: SharedTimeSource) {
self.time_source = new_time_source;
}

#[allow(dead_code)]
#[doc(hidden)]
#[cfg(any(test, feature = "test-util", feature = "legacy-test-util"))]
/// This method should only be used for internal testing
pub fn time_source(&self) -> &SharedTimeSource {
&self.time_source
}
}

/// Builder for constructing a `TokenBucket`.
Expand Down
Loading