Skip to content

Commit 22baaaa

Browse files
authored
Merge pull request #236 from brave/add-epoch-prom
Add epoch to prometheus metrics
2 parents 189a29e + e1b545b commit 22baaaa

File tree

2 files changed

+55
-15
lines changed

2 files changed

+55
-15
lines changed

src/prometheus.rs

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use tokio::sync::Mutex;
1616
pub struct WebMetrics {
1717
total_requests: Family<TotalMetricLabels, Counter>,
1818
in_flight_requests: Family<InflightMetricLabels, Gauge>,
19-
request_duration: Family<TotalMetricLabels, Histogram>,
19+
request_duration: Family<RequestDurationLabels, Histogram>,
2020
}
2121

2222
#[derive(Debug, Clone, Hash, PartialEq, Eq, EncodeLabelSet)]
@@ -27,6 +27,14 @@ pub struct InflightMetricLabels {
2727

2828
#[derive(Debug, Clone, Hash, PartialEq, Eq, EncodeLabelSet)]
2929
pub struct TotalMetricLabels {
30+
method: String,
31+
path: String,
32+
epoch: Option<u8>,
33+
status: u16,
34+
}
35+
36+
#[derive(Debug, Clone, Hash, PartialEq, Eq, EncodeLabelSet)]
37+
pub struct RequestDurationLabels {
3038
method: String,
3139
path: String,
3240
status: u16,
@@ -41,12 +49,27 @@ impl From<&ServiceRequest> for InflightMetricLabels {
4149
}
4250
}
4351

44-
impl From<(&InflightMetricLabels, StatusCode)> for TotalMetricLabels {
45-
fn from(value: (&InflightMetricLabels, StatusCode)) -> Self {
52+
impl TotalMetricLabels {
53+
pub fn new(
54+
inflight_labels: &InflightMetricLabels,
55+
status: StatusCode,
56+
epoch: Option<u8>,
57+
) -> Self {
58+
Self {
59+
method: inflight_labels.method.clone(),
60+
path: inflight_labels.path.clone(),
61+
epoch,
62+
status: status.as_u16(),
63+
}
64+
}
65+
}
66+
67+
impl RequestDurationLabels {
68+
pub fn new(inflight_labels: &InflightMetricLabels, status: StatusCode) -> Self {
4669
Self {
47-
method: value.0.method.clone(),
48-
path: value.0.path.clone(),
49-
status: value.1.as_u16(),
70+
method: inflight_labels.method.clone(),
71+
path: inflight_labels.path.clone(),
72+
status: status.as_u16(),
5073
}
5174
}
5275
}
@@ -70,6 +93,7 @@ impl WebMetrics {
7093
&self,
7194
inflight_labels: &InflightMetricLabels,
7295
total_labels: &TotalMetricLabels,
96+
duration_labels: &RequestDurationLabels,
7397
request_duration: Duration,
7498
) {
7599
match total_labels.status == 404 {
@@ -81,7 +105,7 @@ impl WebMetrics {
81105
self.in_flight_requests.get_or_create(inflight_labels).dec();
82106
self
83107
.request_duration
84-
.get_or_create(total_labels)
108+
.get_or_create(duration_labels)
85109
.observe(request_duration.as_secs_f64());
86110
self.total_requests.get_or_create(total_labels).inc();
87111
}

src/server.rs

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
use crate::channel::get_data_channel_map_from_env;
22
use crate::prometheus::{
3-
create_metric_server, InflightMetricLabels, TotalMetricLabels, WebMetrics,
3+
create_metric_server, InflightMetricLabels, RequestDurationLabels, TotalMetricLabels, WebMetrics,
44
};
55
use crate::record_stream::{
66
get_data_channel_topic_map_from_env, KafkaRecordStream, KafkaRecordStreamConfig, RecordStream,
77
};
88
use crate::star::{parse_message, AppSTARError};
99
use crate::util::parse_env_var;
10-
use actix_web::HttpRequest;
1110
use actix_web::{
1211
dev::Service,
1312
error::ResponseError,
@@ -17,6 +16,7 @@ use actix_web::{
1716
web::{self, Data},
1817
App, HttpResponse, HttpServer, Responder,
1918
};
19+
use actix_web::{HttpMessage, HttpRequest};
2020
use base64::{engine::general_purpose as base64_engine, Engine as _};
2121
use derive_more::{Display, Error, From};
2222
use futures::{future::try_join, FutureExt};
@@ -58,6 +58,8 @@ pub struct ServerState {
5858
pub request_threshold_range: RangeInclusive<usize>,
5959
}
6060

61+
struct EpochExt(u8);
62+
6163
impl ResponseError for WebError {
6264
fn error_response(&self) -> HttpResponse {
6365
HttpResponse::build(self.status_code())
@@ -106,7 +108,7 @@ async fn handle_measurement_submit(
106108
Some(rec_stream) => {
107109
let body_str = from_utf8(&body)?.trim();
108110
let bincode_msg = base64_engine::STANDARD.decode(body_str)?;
109-
parse_message(&bincode_msg)?;
111+
let message = parse_message(&bincode_msg)?;
110112

111113
if let Some(min_revision) = state.min_revision_map.get(channel_name) {
112114
let req_revision: usize =
@@ -128,7 +130,11 @@ async fn handle_measurement_submit(
128130
error!("Failed to push message: {}", e);
129131
Err(WebError::Internal)
130132
}
131-
Ok(_) => Ok(HttpResponse::NoContent().finish()),
133+
Ok(_) => {
134+
let mut response = HttpResponse::NoContent();
135+
response.extensions_mut().insert(EpochExt(message.epoch));
136+
Ok(response.finish())
137+
}
132138
}
133139
}
134140
}
@@ -219,14 +225,24 @@ pub async fn start_server(worker_count: usize, main_channel: String) -> std::io:
219225
let start_time = Instant::now();
220226

221227
srv.call(request).map(move |result| {
222-
let status_code = match result.as_ref() {
223-
Ok(response) => response.status(),
224-
Err(err) => err.as_response_error().status_code(),
228+
let (epoch, status_code) = match result.as_ref() {
229+
Ok(response) => (
230+
response
231+
.response()
232+
.extensions()
233+
.get::<EpochExt>()
234+
.map(|ext| ext.0),
235+
response.status(),
236+
),
237+
Err(err) => (None, err.as_response_error().status_code()),
225238
};
226-
let total_metric_labels = TotalMetricLabels::from((&inflight_metric_labels, status_code));
239+
let total_metric_labels =
240+
TotalMetricLabels::new(&inflight_metric_labels, status_code, epoch);
241+
let duration_labels = RequestDurationLabels::new(&inflight_metric_labels, status_code);
227242
web_metrics.request_end(
228243
&inflight_metric_labels,
229244
&total_metric_labels,
245+
&duration_labels,
230246
start_time.elapsed(),
231247
);
232248

0 commit comments

Comments
 (0)