diff --git a/core/core/src/raw/operation.rs b/core/core/src/raw/operation.rs index ae9e3b47c992..36138dea9fdf 100644 --- a/core/core/src/raw/operation.rs +++ b/core/core/src/raw/operation.rs @@ -86,3 +86,13 @@ impl From for String { v.into_static().to_string() } } + +/// A service-specific operation name attached to HTTP requests. +/// +/// While [`Operation`] describes the OpenDAL-level operation (e.g., read, write, list), +/// `ServiceOperation` describes the specific backend API call being made as a supplement. +/// +/// Services attach this as an HTTP request extension so that observability layers +/// can provide finer-grained breakdowns of HTTP traffic. +#[derive(Debug, Copy, Clone, Hash, Eq, PartialEq)] +pub struct ServiceOperation(pub &'static str); diff --git a/core/layers/fastmetrics/src/lib.rs b/core/layers/fastmetrics/src/lib.rs index 257452078fc8..f9540abd8975 100644 --- a/core/layers/fastmetrics/src/lib.rs +++ b/core/layers/fastmetrics/src/lib.rs @@ -498,6 +498,7 @@ impl LabelSetSchema for OperationLabels { observe::LABEL_OPERATION, observe::LABEL_ERROR, observe::LABEL_STATUS_CODE, + observe::LABEL_SERVICE_OPERATION, ]; Some(NAMES) } @@ -517,6 +518,9 @@ impl EncodeLabelSet for OperationLabels { if let Some(code) = &self.labels.status_code { encoder.encode(&(observe::LABEL_STATUS_CODE, code.as_str()))?; } + if let Some(service_operation) = self.labels.service_operation { + encoder.encode(&(observe::LABEL_SERVICE_OPERATION, service_operation))?; + } Ok(()) } } diff --git a/core/layers/metrics/src/lib.rs b/core/layers/metrics/src/lib.rs index 09511cc59ae0..487a5131d45b 100644 --- a/core/layers/metrics/src/lib.rs +++ b/core/layers/metrics/src/lib.rs @@ -191,6 +191,13 @@ impl OperationLabels { )); } + if let Some(service_operation) = self.0.service_operation { + labels.push(Label::new( + observe::LABEL_SERVICE_OPERATION, + service_operation, + )); + } + labels } } diff --git a/core/layers/observe-metrics-common/src/lib.rs b/core/layers/observe-metrics-common/src/lib.rs index 8d588ca08125..9c5456304178 100644 --- a/core/layers/observe-metrics-common/src/lib.rs +++ b/core/layers/observe-metrics-common/src/lib.rs @@ -202,6 +202,8 @@ pub static LABEL_OPERATION: &str = "operation"; pub static LABEL_ERROR: &str = "error"; /// The metric label for the http code. pub static LABEL_STATUS_CODE: &str = "status_code"; +/// The metric label for the service-specific operation (e.g., "GetObject", "UploadPart"). +pub static LABEL_SERVICE_OPERATION: &str = "service_operation"; /// MetricLabels are the labels for the metrics. #[derive(Clone, Debug, Default, PartialEq, Eq, Hash)] @@ -226,6 +228,8 @@ pub struct MetricLabels { /// Only populated for `HttpStatusErrorsTotal` metric. /// Used to track frequency of specific HTTP error status codes. pub status_code: Option, + /// The service-specific operation name as an optional supplement for operation name. + pub service_operation: Option<&'static str>, } impl MetricLabels { @@ -569,7 +573,7 @@ impl Drop for ExecutingGuard { impl HttpFetch for MetricsHttpFetcher { async fn fetch(&self, req: http::Request) -> Result> { - let labels = MetricLabels::new( + let mut labels = MetricLabels::new( self.info.clone(), req.extensions() .get::() @@ -577,6 +581,7 @@ impl HttpFetch for MetricsHttpFetcher { .map(Operation::into_static) .unwrap_or("unknown"), ); + labels.service_operation = req.extensions().get::().map(|s| s.0); let start = Instant::now(); let req_size = req.body().len(); diff --git a/core/layers/otelmetrics/src/lib.rs b/core/layers/otelmetrics/src/lib.rs index 8c0bb466d9d0..874b55e31454 100644 --- a/core/layers/otelmetrics/src/lib.rs +++ b/core/layers/otelmetrics/src/lib.rs @@ -449,6 +449,13 @@ impl OtelMetricsInterceptor { )); } + if let Some(service_operation) = attrs.service_operation { + attributes.push(KeyValue::new( + observe::LABEL_SERVICE_OPERATION, + service_operation, + )); + } + attributes } } diff --git a/core/layers/prometheus-client/src/lib.rs b/core/layers/prometheus-client/src/lib.rs index 1b5841d8b9ab..d26de1e39af8 100644 --- a/core/layers/prometheus-client/src/lib.rs +++ b/core/layers/prometheus-client/src/lib.rs @@ -506,6 +506,9 @@ impl EncodeLabelSet for OperationLabels { if let Some(code) = &self.labels.status_code { (observe::LABEL_STATUS_CODE, code.as_str()).encode(encoder.encode_label())?; } + if let Some(service_operation) = self.labels.service_operation { + (observe::LABEL_SERVICE_OPERATION, service_operation).encode(encoder.encode_label())?; + } Ok(()) } } diff --git a/core/layers/prometheus/src/lib.rs b/core/layers/prometheus/src/lib.rs index 81680c99f142..2eda2e30ef84 100644 --- a/core/layers/prometheus/src/lib.rs +++ b/core/layers/prometheus/src/lib.rs @@ -375,12 +375,13 @@ impl PrometheusLayerBuilder { .map_err(parse_prometheus_error)? }; + let http_labels = OperationLabels::names().with_service_operation(); let http_executing = { let metric = observe::MetricValue::HttpExecuting(0); register_int_gauge_vec_with_registry!( metric.name(), metric.help(), - labels.as_ref(), + http_labels.as_ref(), registry ) .map_err(parse_prometheus_error)? @@ -390,7 +391,7 @@ impl PrometheusLayerBuilder { register_histogram_vec_with_registry!( metric.name(), metric.help(), - labels.as_ref(), + http_labels.as_ref(), self.bytes_buckets.clone(), registry ) @@ -401,7 +402,7 @@ impl PrometheusLayerBuilder { register_histogram_vec_with_registry!( metric.name(), metric.help(), - labels.as_ref(), + http_labels.as_ref(), self.bytes_rate_buckets.clone(), registry ) @@ -412,7 +413,7 @@ impl PrometheusLayerBuilder { register_histogram_vec_with_registry!( metric.name(), metric.help(), - labels.as_ref(), + http_labels.as_ref(), self.duration_seconds_buckets.clone(), registry ) @@ -423,7 +424,7 @@ impl PrometheusLayerBuilder { register_histogram_vec_with_registry!( metric.name(), metric.help(), - labels.as_ref(), + http_labels.as_ref(), self.bytes_buckets, registry ) @@ -434,7 +435,7 @@ impl PrometheusLayerBuilder { register_histogram_vec_with_registry!( metric.name(), metric.help(), - labels.as_ref(), + http_labels.as_ref(), self.bytes_rate_buckets, registry ) @@ -445,7 +446,7 @@ impl PrometheusLayerBuilder { register_histogram_vec_with_registry!( metric.name(), metric.help(), - labels.as_ref(), + http_labels.as_ref(), self.duration_seconds_buckets, registry ) @@ -456,19 +457,21 @@ impl PrometheusLayerBuilder { register_int_counter_vec_with_registry!( metric.name(), metric.help(), - labels.as_ref(), + http_labels.as_ref(), registry ) .map_err(parse_prometheus_error)? }; - let labels_with_status_code = OperationLabels::names().with_status_code(); + let http_labels_with_status_code = OperationLabels::names() + .with_service_operation() + .with_status_code(); let http_status_errors_total = { let metric = observe::MetricValue::HttpStatusErrorsTotal; register_int_counter_vec_with_registry!( metric.name(), metric.help(), - labels_with_status_code.as_ref(), + http_labels_with_status_code.as_ref(), registry ) .map_err(parse_prometheus_error)? @@ -562,72 +565,72 @@ impl observe::MetricsIntercept for PrometheusInterceptor { match value { observe::MetricValue::OperationBytes(v) => self .operation_bytes - .with_label_values(&labels.values()) + .with_label_values(&labels.op_values()) .observe(v as f64), observe::MetricValue::OperationBytesRate(v) => self .operation_bytes_rate - .with_label_values(&labels.values()) + .with_label_values(&labels.op_values()) .observe(v), observe::MetricValue::OperationEntries(v) => self .operation_entries - .with_label_values(&labels.values()) + .with_label_values(&labels.op_values()) .observe(v as f64), observe::MetricValue::OperationEntriesRate(v) => self .operation_entries_rate - .with_label_values(&labels.values()) + .with_label_values(&labels.op_values()) .observe(v), observe::MetricValue::OperationDurationSeconds(v) => self .operation_duration_seconds - .with_label_values(&labels.values()) + .with_label_values(&labels.op_values()) .observe(v.as_secs_f64()), observe::MetricValue::OperationErrorsTotal => self .operation_errors_total - .with_label_values(&labels.values()) + .with_label_values(&labels.op_values()) .inc(), observe::MetricValue::OperationExecuting(v) => self .operation_executing - .with_label_values(&labels.values()) + .with_label_values(&labels.op_values()) .add(v as i64), observe::MetricValue::OperationTtfbSeconds(v) => self .operation_ttfb_seconds - .with_label_values(&labels.values()) + .with_label_values(&labels.op_values()) .observe(v.as_secs_f64()), observe::MetricValue::HttpExecuting(v) => self .http_executing - .with_label_values(&labels.values()) + .with_label_values(&labels.http_values()) .add(v as i64), observe::MetricValue::HttpRequestBytes(v) => self .http_request_bytes - .with_label_values(&labels.values()) + .with_label_values(&labels.http_values()) .observe(v as f64), observe::MetricValue::HttpRequestBytesRate(v) => self .http_request_bytes_rate - .with_label_values(&labels.values()) + .with_label_values(&labels.http_values()) .observe(v), observe::MetricValue::HttpRequestDurationSeconds(v) => self .http_request_duration_seconds - .with_label_values(&labels.values()) + .with_label_values(&labels.http_values()) .observe(v.as_secs_f64()), observe::MetricValue::HttpResponseBytes(v) => self .http_response_bytes - .with_label_values(&labels.values()) + .with_label_values(&labels.http_values()) .observe(v as f64), observe::MetricValue::HttpResponseBytesRate(v) => self .http_response_bytes_rate - .with_label_values(&labels.values()) + .with_label_values(&labels.http_values()) .observe(v), observe::MetricValue::HttpResponseDurationSeconds(v) => self .http_response_duration_seconds - .with_label_values(&labels.values()) + .with_label_values(&labels.http_values()) .observe(v.as_secs_f64()), observe::MetricValue::HttpConnectionErrorsTotal => self .http_connection_errors_total - .with_label_values(&labels.values()) + .with_label_values(&labels.http_values()) .inc(), observe::MetricValue::HttpStatusErrorsTotal => self .http_status_errors_total - .with_label_values(&labels.values()) + .with_label_values(&labels.http_values()) .inc(), _ => {} } @@ -652,6 +655,11 @@ impl OperationLabelNames { self.0.push(observe::LABEL_STATUS_CODE); self } + + fn with_service_operation(mut self) -> Self { + self.0.push(observe::LABEL_SERVICE_OPERATION); + self + } } #[derive(Clone, Debug, PartialEq, Eq, Hash)] @@ -667,15 +675,29 @@ impl OperationLabels { ]) } - fn values(&self) -> Vec<&str> { - let mut labels = Vec::with_capacity(6); + fn op_values(&self) -> Vec<&str> { + let mut labels = vec![ + self.0.scheme, + self.0.namespace.as_ref(), + self.0.root.as_ref(), + self.0.operation, + ]; + + if let Some(error) = self.0.error { + labels.push(error.into_static()); + } + + labels + } - labels.extend([ + fn http_values(&self) -> Vec<&str> { + let mut labels = vec![ self.0.scheme, self.0.namespace.as_ref(), self.0.root.as_ref(), self.0.operation, - ]); + self.0.service_operation.unwrap_or("unknown"), + ]; if let Some(error) = self.0.error { labels.push(error.into_static()); diff --git a/core/services/s3/src/core.rs b/core/services/s3/src/core.rs index 643c95dc98cb..fe9e5000998d 100644 --- a/core/services/s3/src/core.rs +++ b/core/services/s3/src/core.rs @@ -417,7 +417,9 @@ impl S3Core { req = self.insert_request_payer_header(req); // Inject operation to the request. - req = req.extension(Operation::Stat); + req = req + .extension(Operation::Stat) + .extension(ServiceOperation("HeadObject")); let req = req.body(Buffer::new()).map_err(new_request_build_error)?; @@ -499,7 +501,9 @@ impl S3Core { req = self.insert_sse_headers(req, false); // Inject operation to the request. - req = req.extension(Operation::Read); + req = req + .extension(Operation::Read) + .extension(ServiceOperation("GetObject")); let req = req.body(Buffer::new()).map_err(new_request_build_error)?; @@ -544,7 +548,9 @@ impl S3Core { } // Inject operation to the request. - req = req.extension(Operation::Write); + req = req + .extension(Operation::Write) + .extension(ServiceOperation("PutObject")); // Set body let req = req.body(body).map_err(new_request_build_error)?; @@ -587,7 +593,9 @@ impl S3Core { } // Inject operation to the request. - req = req.extension(Operation::Write); + req = req + .extension(Operation::Write) + .extension(ServiceOperation("PutObject")); // Set body let req = req.body(body).map_err(new_request_build_error)?; @@ -627,6 +635,7 @@ impl S3Core { let req = req // Inject operation to the request. .extension(Operation::Delete) + .extension(ServiceOperation("DeleteObject")) .body(Buffer::new()) .map_err(new_request_build_error)?; @@ -687,6 +696,7 @@ impl S3Core { let req = req // Inject operation to the request. .extension(Operation::Copy) + .extension(ServiceOperation("CopyObject")) .header(constants::X_AMZ_COPY_SOURCE, &source) .body(Buffer::new()) .map_err(new_request_build_error)?; @@ -726,6 +736,7 @@ impl S3Core { let req = req // Inject operation to the request. .extension(Operation::List) + .extension(ServiceOperation("ListObjects")) .body(Buffer::new()) .map_err(new_request_build_error)?; @@ -776,6 +787,7 @@ impl S3Core { let req = req // Inject operation to the request. .extension(Operation::List) + .extension(ServiceOperation("ListObjectsV2")) .body(Buffer::new()) .map_err(new_request_build_error)?; @@ -845,7 +857,9 @@ impl S3Core { req = self.insert_checksum_type_header(req); // Inject operation to the request. - req = req.extension(Operation::Write); + req = req + .extension(Operation::Write) + .extension(ServiceOperation("CreateMultipartUpload")); let req = req.body(Buffer::new()).map_err(new_request_build_error)?; @@ -887,7 +901,9 @@ impl S3Core { } // Inject operation to the request. - req = req.extension(Operation::Write); + req = req + .extension(Operation::Write) + .extension(ServiceOperation("UploadPart")); // Set body let req = req.body(body).map_err(new_request_build_error)?; @@ -935,7 +951,9 @@ impl S3Core { req = self.insert_request_payer_header(req); // Inject operation to the request. - req = req.extension(Operation::Write); + req = req + .extension(Operation::Write) + .extension(ServiceOperation("CompleteMultipartUpload")); let req = req .body(Buffer::from(Bytes::from(content))) @@ -967,6 +985,7 @@ impl S3Core { let req = req // Inject operation to the request. .extension(Operation::Write) + .extension(ServiceOperation("AbortMultipartUpload")) .body(Buffer::new()) .map_err(new_request_build_error)?; @@ -1003,7 +1022,9 @@ impl S3Core { req = self.insert_request_payer_header(req); // Inject operation to the request. - req = req.extension(Operation::Delete); + req = req + .extension(Operation::Delete) + .extension(ServiceOperation("DeleteObjects")); let req = req .body(Buffer::from(Bytes::from(content))) @@ -1055,6 +1076,7 @@ impl S3Core { let req = req // Inject operation to the request. .extension(Operation::List) + .extension(ServiceOperation("ListObjectVersions")) .body(Buffer::new()) .map_err(new_request_build_error)?;