Skip to content

Commit

Permalink
chore: switch metrics & elasticache calls to manual pagination (#332)
Browse files Browse the repository at this point in the history
Manually paginate the metrics and elasticache/serverless elasticache
calls, since the rate limiting code does not work properly with the
paginator stream AWS client code.
  • Loading branch information
nand4011 authored Sep 18, 2024
1 parent 98aeb6c commit 6efd528
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 84 deletions.
59 changes: 26 additions & 33 deletions momento/src/commands/cloud_linter/elasticache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,45 +217,38 @@ async fn describe_clusters(
control_plane_limiter: Arc<DefaultDirectRateLimiter>,
region: &str,
) -> Result<Vec<ElastiCacheResource>, CliError> {
let mut resources = Vec::new();
let mut elasticache_stream = elasticache_client
.describe_cache_clusters()
.show_cache_node_info(true)
.into_paginator()
.send();

while let Some(result) = rate_limit(Arc::clone(&control_plane_limiter), || {
elasticache_stream.next()
})
.await
{
match result {
Ok(result) => {
if let Some(aws_clusters) = result.cache_clusters {
let mut chunks = Vec::new();
for chunk in aws_clusters.chunks(10) {
chunks.push(chunk.to_owned());
}
for clusters in chunks {
for cluster in clusters {
let cluster_resources = convert_to_resources(cluster, region).await?;
resources.extend(cluster_resources);
}
}
}
}
Err(err) => {
return Err(CliError {
msg: format!("Failed to describe cache clusters: {}", err),
});
let mut clusters = Vec::new();
let mut next_marker: Option<String> = None;
loop {
let response = rate_limit(Arc::clone(&control_plane_limiter), || {
let mut req = elasticache_client
.describe_cache_clusters()
.show_cache_node_info(true);
if let Some(marker) = &next_marker {
req = req.marker(marker);
}
req.send()
})
.await?;

if let Some(aws_clusters) = response.cache_clusters.as_ref() {
clusters.extend_from_slice(aws_clusters);
}

next_marker = response.marker().map(String::from);
if next_marker.is_none() {
break;
}
}

Ok(resources)
clusters
.into_iter()
.map(|cluster| convert_to_resources(cluster, region))
.collect::<Result<Vec<_>, _>>()
.map(|vec| vec.into_iter().flatten().collect())
}

async fn convert_to_resources(
fn convert_to_resources(
cluster: CacheCluster,
region: &str,
) -> Result<Vec<ElastiCacheResource>, CliError> {
Expand Down
39 changes: 19 additions & 20 deletions momento/src/commands/cloud_linter/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,26 +148,20 @@ async fn query_metrics_for_target(
metric_data_queries.push(metric_data_query);
}

let mut metric_stream = client
.get_metric_data()
.start_time(DateTime::from_millis(start_millis))
.end_time(DateTime::from_millis(end_millis))
.set_metric_data_queries(Some(metric_data_queries))
.into_paginator()
.send();

while let Some(result) = rate_limit(Arc::clone(&limiter), || metric_stream.next()).await {
let result = match result {
Ok(res) => res,
Err(e) => {
println!("get_metric_data_error: {:?}", e);
return Err(CliError {
msg: "error from aws api while querying metrics".to_string(),
});
}
};
// let result = result?;
if let Some(mdr_vec) = result.metric_data_results {
let mut next_token: Option<String> = None;
loop {
let response = rate_limit(Arc::clone(&limiter), || {
client
.get_metric_data()
.start_time(DateTime::from_millis(start_millis))
.end_time(DateTime::from_millis(end_millis))
.set_metric_data_queries(Some(metric_data_queries.clone()))
.set_next_token(next_token)
.send()
})
.await?;

if let Some(mdr_vec) = response.metric_data_results {
for mdr in mdr_vec {
let name = mdr.id.ok_or_else(|| CliError {
msg: "Metric has no id".to_string(),
Expand All @@ -178,6 +172,11 @@ async fn query_metrics_for_target(
metric_results.push(Metric { name, values });
}
}

next_token = response.next_token;
if next_token.is_none() {
break;
}
}
}

Expand Down
55 changes: 24 additions & 31 deletions momento/src/commands/cloud_linter/serverless_elasticache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,42 +231,35 @@ async fn describe_caches(
control_plane_limiter: Arc<DefaultDirectRateLimiter>,
region: &str,
) -> Result<Vec<ServerlessElastiCacheResource>, CliError> {
let mut resources = Vec::new();
let mut elasticache_stream = elasticache_client
.describe_serverless_caches()
.into_paginator()
.send();

while let Some(result) = rate_limit(Arc::clone(&control_plane_limiter), || {
elasticache_stream.next()
})
.await
{
match result {
Ok(result) => {
if let Some(aws_caches) = result.serverless_caches {
let mut chunks = Vec::new();
for chunk in aws_caches.chunks(10) {
chunks.push(chunk.to_owned());
}
for clusters in chunks {
for cluster in clusters {
resources.push(convert_to_resource(cluster, region).await?);
}
}
}
}
Err(err) => {
return Err(CliError {
msg: format!("Failed to describe serverless caches: {}", err),
});
let mut caches = Vec::new();
let mut next_token: Option<String> = None;
loop {
let response = rate_limit(Arc::clone(&control_plane_limiter), || {
let mut req = elasticache_client.describe_serverless_caches();
if let Some(token) = &next_token {
req = req.next_token(token);
}
req.send()
})
.await?;

if let Some(aws_caches) = response.serverless_caches.as_ref() {
caches.extend_from_slice(aws_caches);
}

next_token = response.next_token().map(String::from);
if next_token.is_none() {
break;
}
}
Ok(resources)

caches
.into_iter()
.map(|cluster| convert_to_resource(cluster, region))
.collect::<Result<_, _>>()
}

async fn convert_to_resource(
fn convert_to_resource(
cache: ServerlessCache,
region: &str,
) -> Result<ServerlessElastiCacheResource, CliError> {
Expand Down

0 comments on commit 6efd528

Please sign in to comment.