Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions core/core/src/raw/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,13 @@ impl From<Operation> for String {
v.into_static().to_string()
}
}

/// A service-specific operation name attached to HTTP requests.
///
/// While [`Operation`] describes the OpenDAL-level operation (e.g., read, write, list),
/// `ServiceOperation` describes the specific backend API call being made as a supplement.
///
/// Services attach this as an HTTP request extension so that observability layers
/// can provide finer-grained breakdowns of HTTP traffic.
#[derive(Debug, Copy, Clone, Hash, Eq, PartialEq)]
pub struct ServiceOperation(pub &'static str);
4 changes: 4 additions & 0 deletions core/layers/fastmetrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,7 @@ impl LabelSetSchema for OperationLabels {
observe::LABEL_OPERATION,
observe::LABEL_ERROR,
observe::LABEL_STATUS_CODE,
observe::LABEL_SERVICE_OPERATION,
];
Some(NAMES)
}
Expand All @@ -517,6 +518,9 @@ impl EncodeLabelSet for OperationLabels {
if let Some(code) = &self.labels.status_code {
encoder.encode(&(observe::LABEL_STATUS_CODE, code.as_str()))?;
}
if let Some(service_operation) = self.labels.service_operation {
encoder.encode(&(observe::LABEL_SERVICE_OPERATION, service_operation))?;
}
Ok(())
}
}
7 changes: 7 additions & 0 deletions core/layers/metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,13 @@ impl OperationLabels {
));
}

if let Some(service_operation) = self.0.service_operation {
labels.push(Label::new(
observe::LABEL_SERVICE_OPERATION,
service_operation,
));
}

labels
}
}
7 changes: 6 additions & 1 deletion core/layers/observe-metrics-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ pub static LABEL_OPERATION: &str = "operation";
pub static LABEL_ERROR: &str = "error";
/// The metric label for the http code.
pub static LABEL_STATUS_CODE: &str = "status_code";
/// The metric label for the service-specific operation (e.g., "GetObject", "UploadPart").
pub static LABEL_SERVICE_OPERATION: &str = "service_operation";

/// MetricLabels are the labels for the metrics.
#[derive(Clone, Debug, Default, PartialEq, Eq, Hash)]
Expand All @@ -226,6 +228,8 @@ pub struct MetricLabels {
/// Only populated for `HttpStatusErrorsTotal` metric.
/// Used to track frequency of specific HTTP error status codes.
pub status_code: Option<StatusCode>,
/// The service-specific operation name as an optional supplement for operation name.
pub service_operation: Option<&'static str>,
}

impl MetricLabels {
Expand Down Expand Up @@ -569,14 +573,15 @@ impl<I: MetricsIntercept> Drop for ExecutingGuard<I> {

impl<I: MetricsIntercept> HttpFetch for MetricsHttpFetcher<I> {
async fn fetch(&self, req: http::Request<Buffer>) -> Result<http::Response<HttpBody>> {
let labels = MetricLabels::new(
let mut labels = MetricLabels::new(
self.info.clone(),
req.extensions()
.get::<Operation>()
.copied()
.map(Operation::into_static)
.unwrap_or("unknown"),
);
labels.service_operation = req.extensions().get::<ServiceOperation>().map(|s| s.0);

let start = Instant::now();
let req_size = req.body().len();
Expand Down
7 changes: 7 additions & 0 deletions core/layers/otelmetrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,13 @@ impl OtelMetricsInterceptor {
));
}

if let Some(service_operation) = attrs.service_operation {
attributes.push(KeyValue::new(
observe::LABEL_SERVICE_OPERATION,
service_operation,
));
}

attributes
}
}
Expand Down
3 changes: 3 additions & 0 deletions core/layers/prometheus-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,9 @@ impl EncodeLabelSet for OperationLabels {
if let Some(code) = &self.labels.status_code {
(observe::LABEL_STATUS_CODE, code.as_str()).encode(encoder.encode_label())?;
}
if let Some(service_operation) = self.labels.service_operation {
(observe::LABEL_SERVICE_OPERATION, service_operation).encode(encoder.encode_label())?;
}
Ok(())
}
}
Expand Down
84 changes: 53 additions & 31 deletions core/layers/prometheus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,12 +375,13 @@ impl PrometheusLayerBuilder {
.map_err(parse_prometheus_error)?
};

let http_labels = OperationLabels::names().with_service_operation();
let http_executing = {
let metric = observe::MetricValue::HttpExecuting(0);
register_int_gauge_vec_with_registry!(
metric.name(),
metric.help(),
labels.as_ref(),
http_labels.as_ref(),
registry
)
.map_err(parse_prometheus_error)?
Expand All @@ -390,7 +391,7 @@ impl PrometheusLayerBuilder {
register_histogram_vec_with_registry!(
metric.name(),
metric.help(),
labels.as_ref(),
http_labels.as_ref(),
self.bytes_buckets.clone(),
registry
)
Expand All @@ -401,7 +402,7 @@ impl PrometheusLayerBuilder {
register_histogram_vec_with_registry!(
metric.name(),
metric.help(),
labels.as_ref(),
http_labels.as_ref(),
self.bytes_rate_buckets.clone(),
registry
)
Expand All @@ -412,7 +413,7 @@ impl PrometheusLayerBuilder {
register_histogram_vec_with_registry!(
metric.name(),
metric.help(),
labels.as_ref(),
http_labels.as_ref(),
self.duration_seconds_buckets.clone(),
registry
)
Expand All @@ -423,7 +424,7 @@ impl PrometheusLayerBuilder {
register_histogram_vec_with_registry!(
metric.name(),
metric.help(),
labels.as_ref(),
http_labels.as_ref(),
self.bytes_buckets,
registry
)
Expand All @@ -434,7 +435,7 @@ impl PrometheusLayerBuilder {
register_histogram_vec_with_registry!(
metric.name(),
metric.help(),
labels.as_ref(),
http_labels.as_ref(),
self.bytes_rate_buckets,
registry
)
Expand All @@ -445,7 +446,7 @@ impl PrometheusLayerBuilder {
register_histogram_vec_with_registry!(
metric.name(),
metric.help(),
labels.as_ref(),
http_labels.as_ref(),
self.duration_seconds_buckets,
registry
)
Expand All @@ -456,19 +457,21 @@ impl PrometheusLayerBuilder {
register_int_counter_vec_with_registry!(
metric.name(),
metric.help(),
labels.as_ref(),
http_labels.as_ref(),
registry
)
.map_err(parse_prometheus_error)?
};

let labels_with_status_code = OperationLabels::names().with_status_code();
let http_labels_with_status_code = OperationLabels::names()
.with_service_operation()
.with_status_code();
let http_status_errors_total = {
let metric = observe::MetricValue::HttpStatusErrorsTotal;
register_int_counter_vec_with_registry!(
metric.name(),
metric.help(),
labels_with_status_code.as_ref(),
http_labels_with_status_code.as_ref(),
registry
)
.map_err(parse_prometheus_error)?
Expand Down Expand Up @@ -562,72 +565,72 @@ impl observe::MetricsIntercept for PrometheusInterceptor {
match value {
observe::MetricValue::OperationBytes(v) => self
.operation_bytes
.with_label_values(&labels.values())
.with_label_values(&labels.op_values())
.observe(v as f64),
observe::MetricValue::OperationBytesRate(v) => self
.operation_bytes_rate
.with_label_values(&labels.values())
.with_label_values(&labels.op_values())
.observe(v),
observe::MetricValue::OperationEntries(v) => self
.operation_entries
.with_label_values(&labels.values())
.with_label_values(&labels.op_values())
.observe(v as f64),
observe::MetricValue::OperationEntriesRate(v) => self
.operation_entries_rate
.with_label_values(&labels.values())
.with_label_values(&labels.op_values())
.observe(v),
observe::MetricValue::OperationDurationSeconds(v) => self
.operation_duration_seconds
.with_label_values(&labels.values())
.with_label_values(&labels.op_values())
.observe(v.as_secs_f64()),
observe::MetricValue::OperationErrorsTotal => self
.operation_errors_total
.with_label_values(&labels.values())
.with_label_values(&labels.op_values())
.inc(),
observe::MetricValue::OperationExecuting(v) => self
.operation_executing
.with_label_values(&labels.values())
.with_label_values(&labels.op_values())
.add(v as i64),
observe::MetricValue::OperationTtfbSeconds(v) => self
.operation_ttfb_seconds
.with_label_values(&labels.values())
.with_label_values(&labels.op_values())
.observe(v.as_secs_f64()),

observe::MetricValue::HttpExecuting(v) => self
.http_executing
.with_label_values(&labels.values())
.with_label_values(&labels.http_values())
.add(v as i64),
observe::MetricValue::HttpRequestBytes(v) => self
.http_request_bytes
.with_label_values(&labels.values())
.with_label_values(&labels.http_values())
.observe(v as f64),
observe::MetricValue::HttpRequestBytesRate(v) => self
.http_request_bytes_rate
.with_label_values(&labels.values())
.with_label_values(&labels.http_values())
.observe(v),
observe::MetricValue::HttpRequestDurationSeconds(v) => self
.http_request_duration_seconds
.with_label_values(&labels.values())
.with_label_values(&labels.http_values())
.observe(v.as_secs_f64()),
observe::MetricValue::HttpResponseBytes(v) => self
.http_response_bytes
.with_label_values(&labels.values())
.with_label_values(&labels.http_values())
.observe(v as f64),
observe::MetricValue::HttpResponseBytesRate(v) => self
.http_response_bytes_rate
.with_label_values(&labels.values())
.with_label_values(&labels.http_values())
.observe(v),
observe::MetricValue::HttpResponseDurationSeconds(v) => self
.http_response_duration_seconds
.with_label_values(&labels.values())
.with_label_values(&labels.http_values())
.observe(v.as_secs_f64()),
observe::MetricValue::HttpConnectionErrorsTotal => self
.http_connection_errors_total
.with_label_values(&labels.values())
.with_label_values(&labels.http_values())
.inc(),
observe::MetricValue::HttpStatusErrorsTotal => self
.http_status_errors_total
.with_label_values(&labels.values())
.with_label_values(&labels.http_values())
.inc(),
_ => {}
}
Expand All @@ -652,6 +655,11 @@ impl OperationLabelNames {
self.0.push(observe::LABEL_STATUS_CODE);
self
}

fn with_service_operation(mut self) -> Self {
self.0.push(observe::LABEL_SERVICE_OPERATION);
self
}
}

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
Expand All @@ -667,15 +675,29 @@ impl OperationLabels {
])
}

fn values(&self) -> Vec<&str> {
let mut labels = Vec::with_capacity(6);
fn op_values(&self) -> Vec<&str> {
let mut labels = vec![
self.0.scheme,
self.0.namespace.as_ref(),
self.0.root.as_ref(),
self.0.operation,
];

if let Some(error) = self.0.error {
labels.push(error.into_static());
}

labels
}

labels.extend([
fn http_values(&self) -> Vec<&str> {
let mut labels = vec![
self.0.scheme,
self.0.namespace.as_ref(),
self.0.root.as_ref(),
self.0.operation,
]);
self.0.service_operation.unwrap_or("unknown"),
];

if let Some(error) = self.0.error {
labels.push(error.into_static());
Expand Down
Loading
Loading