Skip to content

Commit f4024b9

Browse files
authored
Add service counter for service level server metrics (#4540)
## Motivation and Context <!--- Why is this change required? What problem does it solve? --> <!--- If it fixes an open issue, please link to the issue here --> Adds a service counter for guarded decrement of outstanding requests. In case something happens before the response backswing, we will want to ensure it decrements. ## Description <!--- Describe your changes in detail --> ## Testing <!--- Please describe in detail how you tested your changes --> <!--- Include details of your testing environment, and the tests you ran to --> <!--- see how your change affects other areas of the code, etc. --> ## Checklist <!--- If a checkbox below is not applicable, then please DELETE it rather than leaving it unchecked --> - [ ] For changes to the smithy-rs codegen or runtime crates, I have created a changelog entry Markdown file in the `.changelog` directory, specifying "client," "server," or both in the `applies_to` key. - [ ] For changes to the AWS SDK, generated SDK code, or SDK runtime crates, I have created a changelog entry Markdown file in the `.changelog` directory, specifying "aws-sdk-rust" in the `applies_to` key. ---- _By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice._
1 parent c5041e6 commit f4024b9

5 files changed

Lines changed: 73 additions & 28 deletions

File tree

rust-runtime/aws-smithy-http-server-metrics/src/default.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
//! to collect standard metrics automatically.
1010
1111
use std::fmt::Debug;
12-
use std::sync::atomic::AtomicUsize;
1312
use std::sync::Arc;
1413
use std::sync::Mutex;
1514
use std::time::Duration;
@@ -18,12 +17,21 @@ use metrique::unit_of_work::metrics;
1817
use metrique::Slot;
1918
use metrique::SlotGuard;
2019

20+
use crate::default::service_counter::ServiceCounter;
21+
22+
pub(crate) mod service_counter;
23+
2124
/// Container for keeping track of state across all requests for the service (all operations) for default metrics
2225
///
2326
/// This type is not intended for direct use. See [`DefaultMetricsPlugin`](crate::plugin::DefaultMetricsPlugin).
27+
#[derive(Debug, Default, Clone)]
28+
pub struct DefaultMetricsServiceCounters {
29+
pub(crate) outstanding_requests_counter: ServiceCounter,
30+
}
31+
2432
#[derive(Debug, Default, Clone)]
2533
pub struct DefaultMetricsServiceState {
26-
pub(crate) outstanding_requests_counter: Arc<AtomicUsize>,
34+
pub(crate) outstanding_requests: usize,
2735
}
2836

2937
/// Container for default request and response metrics.
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
use std::sync::atomic::AtomicUsize;
7+
use std::sync::atomic::Ordering;
8+
use std::sync::Arc;
9+
10+
/// A service-level counter for metrics that need state beyond a single request
11+
#[derive(Default, Debug, Clone)]
12+
pub(crate) struct ServiceCounter {
13+
inner: Arc<ServiceCounterInner>,
14+
}
15+
impl ServiceCounter {
16+
/// Increments the global count by 1, returning a guard that
17+
/// decrements the count on drop, and the new value
18+
pub(crate) fn increment(&self) -> (ServiceCounterGuard, usize) {
19+
let count = self.inner.increment();
20+
(ServiceCounterGuard(Arc::clone(&self.inner)), count)
21+
}
22+
}
23+
24+
#[derive(Default, Debug, Clone)]
25+
pub(crate) struct ServiceCounterInner {
26+
count: Arc<AtomicUsize>,
27+
}
28+
impl ServiceCounterInner {
29+
fn increment(&self) -> usize {
30+
self.count.fetch_add(1, Ordering::Relaxed) + 1
31+
}
32+
}
33+
34+
/// Guard for [`ServiceCounter`] that decrements its count on drop
35+
pub(crate) struct ServiceCounterGuard(Arc<ServiceCounterInner>);
36+
37+
impl Drop for ServiceCounterGuard {
38+
fn drop(&mut self) {
39+
self.0.count.fetch_sub(1, Ordering::Relaxed);
40+
}
41+
}

rust-runtime/aws-smithy-http-server-metrics/src/layer.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use thiserror::Error;
1212
use tower::Layer;
1313

1414
use crate::default::DefaultMetrics;
15+
use crate::default::DefaultMetricsServiceCounters;
1516
use crate::default::DefaultMetricsServiceState;
1617
use crate::default::DefaultRequestMetricsConfig;
1718
use crate::default::DefaultResponseMetricsConfig;
@@ -226,7 +227,7 @@ where
226227
default_metrics_extension_fn: self.default_metrics_extension_fn,
227228
default_req_metrics_config: self.default_req_metrics_config.clone(),
228229
default_res_metrics_config: self.default_res_metrics_config.clone(),
229-
default_service_state: DefaultMetricsServiceState::default(),
230+
default_service_counters: DefaultMetricsServiceCounters::default(),
230231

231232
_entry_sink: PhantomData,
232233
}

rust-runtime/aws-smithy-http-server-metrics/src/plugin.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55

66
use std::future::Future;
77
use std::pin::Pin;
8-
use std::sync::atomic::Ordering;
98
use std::task::Context;
109
use std::task::Poll;
1110
use std::time::Duration;
@@ -336,11 +335,8 @@ fn extend_default_request_metrics(
336335
return DefaultRequestMetrics::default();
337336
}
338337

339-
let outstanding_requests = (!config.disable_outstanding_requests).then_some(
340-
ext.service_state
341-
.outstanding_requests_counter
342-
.load(Ordering::Relaxed),
343-
);
338+
let outstanding_requests =
339+
(!config.disable_outstanding_requests).then_some(ext.service_state.outstanding_requests);
344340

345341
DefaultRequestMetrics {
346342
service: metrics.service.filter(|_| !config.disable_service),

rust-runtime/aws-smithy-http-server-metrics/src/service.rs

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,14 @@
66
use std::future::Future;
77
use std::marker::PhantomData;
88
use std::pin::Pin;
9-
use std::sync::atomic::Ordering;
109
use std::task::Context;
1110
use std::task::Poll;
1211

1312
use pin_project_lite::pin_project;
1413
use tower::Service;
1514

15+
use crate::default::service_counter::ServiceCounterGuard;
16+
use crate::default::DefaultMetricsServiceCounters;
1617
use crate::default::DefaultMetricsServiceState;
1718
use crate::default::DefaultRequestMetricsConfig;
1819
use crate::default::DefaultResponseMetricsConfig;
@@ -38,7 +39,8 @@ pin_project! {
3839
inner: F,
3940
metrics: metrique::AppendAndCloseOnDrop<Entry, Sink>,
4041
response_metrics: Option<Res>,
41-
default_service_state: DefaultMetricsServiceState
42+
default_service_state: DefaultMetricsServiceCounters,
43+
outstanding_requests_counter_guard: ServiceCounterGuard
4244
}
4345
}
4446

@@ -57,23 +59,13 @@ where
5759

5860
match this.inner.poll(cx) {
5961
Poll::Ready(Ok(mut res)) => {
60-
this.default_service_state
61-
.outstanding_requests_counter
62-
.fetch_sub(1, Ordering::Relaxed);
63-
6462
if let Some(response_metrics) = this.response_metrics {
6563
(response_metrics)(&mut res, this.metrics);
6664
}
6765

6866
Poll::Ready(Ok(res))
6967
}
70-
Poll::Ready(Err(e)) => {
71-
this.default_service_state
72-
.outstanding_requests_counter
73-
.fetch_sub(1, Ordering::Relaxed);
74-
75-
Poll::Ready(Err(e))
76-
}
68+
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
7769
Poll::Pending => Poll::Pending,
7870
}
7971
}
@@ -101,7 +93,7 @@ where
10193
),
10294
pub(crate) default_req_metrics_config: DefaultRequestMetricsConfig,
10395
pub(crate) default_res_metrics_config: DefaultResponseMetricsConfig,
104-
pub(crate) default_service_state: DefaultMetricsServiceState,
96+
pub(crate) default_service_counters: DefaultMetricsServiceCounters,
10597

10698
pub(crate) _entry_sink: PhantomData<Sink>,
10799
}
@@ -121,7 +113,7 @@ where
121113
default_metrics_extension_fn: self.default_metrics_extension_fn,
122114
default_req_metrics_config: self.default_req_metrics_config.clone(),
123115
default_res_metrics_config: self.default_res_metrics_config.clone(),
124-
default_service_state: self.default_service_state.clone(),
116+
default_service_counters: self.default_service_counters.clone(),
125117

126118
_entry_sink: PhantomData,
127119
}
@@ -148,23 +140,30 @@ where
148140
fn call(&mut self, mut req: HttpRequest) -> Self::Future {
149141
let mut metrics = (self.init_metrics)(&mut req);
150142

151-
self.default_service_state
143+
// We increment the outstanding requests and get the count at this layer
144+
// (typically outer middleware) so we can get this as close to the time
145+
// this request entered the system as possible
146+
let (outstanding_requests_counter_guard, outstanding_requests) = self
147+
.default_service_counters
152148
.outstanding_requests_counter
153-
.fetch_add(1, Ordering::Relaxed);
149+
.increment();
154150

155151
(self.default_metrics_extension_fn)(
156152
&mut req,
157153
&mut metrics,
158154
self.default_req_metrics_config.clone(),
159155
self.default_res_metrics_config.clone(),
160-
self.default_service_state.clone(),
156+
DefaultMetricsServiceState {
157+
outstanding_requests,
158+
},
161159
);
162160

163161
MetricsLayerServiceFuture {
164162
inner: self.inner.call(req),
165163
metrics,
166164
response_metrics: self.response_metrics.clone(),
167-
default_service_state: self.default_service_state.clone(),
165+
default_service_state: self.default_service_counters.clone(),
166+
outstanding_requests_counter_guard,
168167
}
169168
}
170169
}

0 commit comments

Comments
 (0)