Skip to content

Commit b43a32e

Browse files
committed
Refactor TokenBucket to not carry a TimeSource
1 parent daaaaae commit b43a32e

4 files changed

Lines changed: 91 additions & 162 deletions

File tree

codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/client/FluentClientGenerator.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,6 @@ private fun baseClientRuntimePluginsFn(
278278
#{DefaultPluginParams}::new()
279279
.with_retry_partition_name(default_retry_partition)
280280
.with_behavior_version(config.behavior_version.expect(${behaviorVersionError.dq()}))
281-
.with_time_source(config.runtime_components.time_source().unwrap_or_default())
282281
))
283282
// user config
284283
.with_client_plugin(

rust-runtime/aws-smithy-runtime/src/client/defaults.rs

Lines changed: 7 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use crate::client::retries::strategy::standard::TokenBucketProvider;
1515
use crate::client::retries::strategy::StandardRetryStrategy;
1616
use crate::client::retries::RetryPartition;
1717
use aws_smithy_async::rt::sleep::default_async_sleep;
18-
use aws_smithy_async::time::{SharedTimeSource, SystemTimeSource, TimeSource};
18+
use aws_smithy_async::time::SystemTimeSource;
1919
use aws_smithy_runtime_api::box_error::BoxError;
2020
use aws_smithy_runtime_api::client::behavior_version::BehaviorVersion;
2121
use aws_smithy_runtime_api::client::http::SharedHttpClient;
@@ -126,7 +126,6 @@ pub fn default_time_source_plugin() -> Option<SharedRuntimePlugin> {
126126
}
127127

128128
/// Runtime plugin that sets the default retry strategy, config (disabled), and partition.
129-
#[deprecated = "Use default_retry_config_plugin_v2 to get a TokenBucket that respects the user provided TimeSource."]
130129
pub fn default_retry_config_plugin(
131130
default_partition_name: impl Into<Cow<'static, str>>,
132131
) -> Option<SharedRuntimePlugin> {
@@ -138,43 +137,7 @@ pub fn default_retry_config_plugin(
138137
.with_config_validator(SharedConfigValidator::base_client_config_fn(
139138
validate_retry_config,
140139
))
141-
.with_interceptor(TokenBucketProvider::new(
142-
retry_partition.clone(),
143-
SharedTimeSource::default(), // Replicates previous behavior
144-
))
145-
})
146-
.with_config(layer("default_retry_config", |layer| {
147-
layer.store_put(RetryConfig::disabled());
148-
layer.store_put(retry_partition);
149-
}))
150-
.into_shared(),
151-
)
152-
}
153-
154-
/// Runtime plugin that sets the default retry strategy, config (disabled), and partition.
155-
pub fn default_retry_config_plugin_v2(
156-
default_plugin_params: &DefaultPluginParams,
157-
) -> Option<SharedRuntimePlugin> {
158-
let retry_partition = RetryPartition::new(
159-
default_plugin_params
160-
.retry_partition_name()
161-
.clone()
162-
.expect("retry_partition_name is required"),
163-
);
164-
Some(
165-
default_plugin("default_retry_config_plugin", |components| {
166-
components
167-
.with_retry_strategy(Some(StandardRetryStrategy::new()))
168-
.with_config_validator(SharedConfigValidator::base_client_config_fn(
169-
validate_retry_config,
170-
))
171-
.with_interceptor(TokenBucketProvider::new(
172-
retry_partition.clone(),
173-
default_plugin_params
174-
.time_source
175-
.clone()
176-
.unwrap_or_default(),
177-
))
140+
.with_interceptor(TokenBucketProvider::new(retry_partition.clone()))
178141
})
179142
.with_config(layer("default_retry_config", |layer| {
180143
layer.store_put(RetryConfig::disabled());
@@ -325,7 +288,6 @@ fn validate_stalled_stream_protection_config(
325288
pub struct DefaultPluginParams {
326289
retry_partition_name: Option<Cow<'static, str>>,
327290
behavior_version: Option<BehaviorVersion>,
328-
time_source: Option<SharedTimeSource>,
329291
}
330292

331293
impl DefaultPluginParams {
@@ -340,32 +302,11 @@ impl DefaultPluginParams {
340302
self
341303
}
342304

343-
/// Gets the retry partition name.
344-
pub fn retry_partition_name(&self) -> &Option<Cow<'static, str>> {
345-
&self.retry_partition_name
346-
}
347-
348305
/// Sets the behavior major version.
349306
pub fn with_behavior_version(mut self, version: BehaviorVersion) -> Self {
350307
self.behavior_version = Some(version);
351308
self
352309
}
353-
354-
/// Gets the behavior major version.
355-
pub fn behavior_version(&self) -> &Option<BehaviorVersion> {
356-
&self.behavior_version
357-
}
358-
359-
/// Sets the time_source.
360-
pub fn with_time_source(mut self, time_source: impl TimeSource + 'static) -> Self {
361-
self.time_source = Some(SharedTimeSource::new(time_source));
362-
self
363-
}
364-
365-
/// Gets the time_source.
366-
pub fn time_source(&self) -> &Option<SharedTimeSource> {
367-
&self.time_source
368-
}
369310
}
370311

371312
/// All default plugins.
@@ -379,7 +320,11 @@ pub fn default_plugins(
379320
[
380321
default_http_client_plugin_v2(behavior_version),
381322
default_identity_cache_plugin(),
382-
default_retry_config_plugin_v2(&params),
323+
default_retry_config_plugin(
324+
params
325+
.retry_partition_name
326+
.expect("retry_partition_name is required"),
327+
),
383328
default_sleep_impl_plugin(),
384329
default_time_source_plugin(),
385330
default_timeout_config_plugin(),

rust-runtime/aws-smithy-runtime/src/client/retries/strategy/standard.rs

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
use std::sync::Mutex;
77
use std::time::{Duration, SystemTime};
88

9-
use aws_smithy_async::time::SharedTimeSource;
109
use tokio::sync::OwnedSemaphorePermit;
1110
use tracing::{debug, trace};
1211

@@ -257,7 +256,10 @@ impl RetryStrategy for StandardRetryStrategy {
257256

258257
// acquire permit for retry
259258
let error_kind = error_kind.expect("result was classified retryable");
260-
match token_bucket.acquire(&error_kind) {
259+
match token_bucket.acquire(
260+
&error_kind,
261+
&runtime_components.time_source().unwrap_or_default(),
262+
) {
261263
Some(permit) => self.set_retry_permit(permit),
262264
None => {
263265
debug!("attempt #{request_attempts} failed with {error_kind:?}; However, not enough retry quota is available for another attempt so no retry will be attempted.");
@@ -372,12 +374,8 @@ impl TokenBucketProvider {
372374
///
373375
/// NOTE: This partition should be the one used for every operation on a client
374376
/// unless config is overridden.
375-
pub(crate) fn new(default_partition: RetryPartition, time_source: SharedTimeSource) -> Self {
376-
let token_bucket = TOKEN_BUCKET.get_or_init(default_partition.clone(), || {
377-
let mut tb = TokenBucket::default();
378-
tb.update_time_source(time_source);
379-
tb
380-
});
377+
pub(crate) fn new(default_partition: RetryPartition) -> Self {
378+
let token_bucket = TOKEN_BUCKET.get_or_init_default(default_partition.clone());
381379
Self {
382380
default_partition,
383381
token_bucket,
@@ -393,7 +391,7 @@ impl Intercept for TokenBucketProvider {
393391
fn modify_before_retry_loop(
394392
&self,
395393
_context: &mut BeforeTransmitInterceptorContextMut<'_>,
396-
runtime_components: &RuntimeComponents,
394+
_runtime_components: &RuntimeComponents,
397395
cfg: &mut ConfigBag,
398396
) -> Result<(), BoxError> {
399397
let retry_partition = cfg.load::<RetryPartition>().expect("set in default config");
@@ -407,11 +405,7 @@ impl Intercept for TokenBucketProvider {
407405
// avoid contention on the global lock
408406
self.token_bucket.clone()
409407
} else {
410-
TOKEN_BUCKET.get_or_init(retry_partition.clone(), || {
411-
let mut tb = TokenBucket::default();
412-
tb.update_time_source(runtime_components.time_source().unwrap_or_default());
413-
tb
414-
})
408+
TOKEN_BUCKET.get_or_init_default(retry_partition.clone())
415409
}
416410
}
417411
RetryPartitionInner::Custom { token_bucket, .. } => token_bucket.clone(),

0 commit comments

Comments
 (0)