diff --git a/.schema/spicepod.schema.json b/.schema/spicepod.schema.json index f1b03ab627..b7ac07eb19 100644 --- a/.schema/spicepod.schema.json +++ b/.schema/spicepod.schema.json @@ -887,6 +887,13 @@ "description": "Aggregation temporality preference for exported metrics.\n\nDefaults to `delta`, which is required by Datadog and recommended for\nmost push-based OTLP backends (`CloudWatch`, New Relic, etc.). Set to\n`cumulative` when forwarding to an `OTel` collector that feeds Prometheus\nor another cumulative-native backend. See [`OtelTemporality`] for\ndetails.", "$ref": "#/$defs/OtelTemporality", "default": "delta" + }, + "prefix": { + "description": "Optional prefix prepended to every exported metric name.\n\nUseful for namespacing Spice metrics in shared backends so they don't\ncollide with metrics from other services. For example, with\n`prefix: \"spiceai.\"` the runtime metric `query_duration_ms` is exported\nas `spiceai.query_duration_ms`.\n\nThe prefix is applied via an `OpenTelemetry` `View` on the runtime's\n`MeterProvider`, so it affects every metric reader attached to that\nprovider (the `otel_exporter` push reader, the Prometheus scrape\nendpoint, and the cluster on-demand OTLP reader). In practice users\ntypically enable only one of these at a time, so the prefix appears on\nexactly the destination they configured it for.", + "type": [ + "string", + "null" + ] } }, "additionalProperties": false, diff --git a/Cargo.lock b/Cargo.lock index 8a236a5e7b..883c22fb6a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4002,6 +4002,7 @@ dependencies = [ "token_provider", "tokio", "tracing", + "url", ] [[package]] diff --git a/Dockerfile.local b/Dockerfile.local index 904d855b84..1b5df68221 100644 --- a/Dockerfile.local +++ b/Dockerfile.local @@ -1,4 +1,4 @@ -FROM ubuntu:22.04 +FROM ubuntu:24.04 RUN apt-get update \ && apt-get install --yes --no-install-recommends ca-certificates libssl3 tzdata \ diff --git a/bin/spiced/src/lib.rs b/bin/spiced/src/lib.rs index 55c4b03d7e..e573b27980 100644 --- a/bin/spiced/src/lib.rs +++ b/bin/spiced/src/lib.rs @@ -567,12 +567,15 @@ pub async fn run(args: Args) -> Result<()> { std::collections::HashMap::new() }; + let metric_prefix = telemetry_config.get().and_then(|c| c.metric_prefix.clone()); + init_metrics( &rt.datafusion(), prometheus_registry.clone(), otel_config, resolved_otel_headers, metrics_reader, + metric_prefix, ) .context(UnableToInitializeMetricsSnafu)?; } @@ -718,11 +721,43 @@ fn init_metrics( otel_config: Option<&app::spicepod::component::runtime::OtelExporterConfig>, resolved_otel_headers: std::collections::HashMap, metrics_reader: Option, + metric_prefix: Option, ) -> Result<(), Box> { let resource = Resource::builder().build(); let mut provider_builder = SdkMeterProvider::builder().with_resource(resource); + // Optional metric name prefix (e.g. "spiceai.") configured under + // `runtime.telemetry.metric_prefix`. Applied via an OTel View on the + // MeterProvider, so the rename happens once at the SDK layer and is + // observed by every reader attached below (Prometheus scrape, cluster + // on-demand OTLP, OTEL push). The prefix is intentionally placed at the + // telemetry level rather than under any single exporter because + // OpenTelemetry 0.31's SDK does not support per-reader name transforms. + if let Some(prefix) = metric_prefix.filter(|p| !p.is_empty()) { + tracing::info!(prefix = %prefix, "OTEL metrics name prefix enabled"); + provider_builder = provider_builder.with_view( + move |instrument: &opentelemetry_sdk::metrics::Instrument| { + let new_name = format!("{prefix}{}", instrument.name()); + match opentelemetry_sdk::metrics::Stream::builder() + .with_name(new_name.clone()) + .build() + { + Ok(stream) => Some(stream), + Err(e) => { + tracing::warn!( + instrument = %instrument.name(), + new_name = %new_name, + error = %e, + "Failed to apply OTEL metric prefix; instrument will keep its original name" + ); + None + } + } + }, + ); + } + // Case 1: Prometheus scrape if let Some(registry) = registry { let prometheus_exporter = opentelemetry_prometheus::exporter() diff --git a/crates/data-connectors/connector-databricks/Cargo.toml b/crates/data-connectors/connector-databricks/Cargo.toml index baee98ab80..243808c7c2 100644 --- a/crates/data-connectors/connector-databricks/Cargo.toml +++ b/crates/data-connectors/connector-databricks/Cargo.toml @@ -38,6 +38,7 @@ snafu.workspace = true token_provider = { path = "../../token_provider" } tokio.workspace = true tracing.workspace = true +url.workspace = true [features] default = [] diff --git a/crates/data-connectors/connector-databricks/src/lib.rs b/crates/data-connectors/connector-databricks/src/lib.rs index 72d84f461e..0b462b99db 100644 --- a/crates/data-connectors/connector-databricks/src/lib.rs +++ b/crates/data-connectors/connector-databricks/src/lib.rs @@ -35,6 +35,8 @@ use data_components::unity_catalog::{ }; use data_components::{Read, RefreshableCatalogProvider}; use datafusion::datasource::TableProvider; +use datafusion::datasource::listing::ListingTableUrl; +use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::sql::TableReference; use opentelemetry::KeyValue; use runtime::Runtime; @@ -53,7 +55,6 @@ use runtime::token_providers::databricks::{ AuthCredentials, DatabricksM2MTokenProvider, DatabricksU2MTokenProvider, }; use runtime_secrets::get_params_with_secrets; -#[cfg(feature = "spark")] use secrecy::ExposeSecret; use secrecy::SecretString; use snafu::prelude::*; @@ -222,6 +223,16 @@ pub struct Databricks { /// Unity Catalog client for table type detection and permission checking. /// Present when the connector was created with enough information to call UC APIs. uc_client: Option>, + /// Typed handle to the Delta read provider, present only in `delta_lake` + /// mode. Used by `register_object_stores` to resolve table storage + /// locations (which are only known after a UC round-trip) so the + /// underlying object store can be registered on the cluster executor's + /// runtime env. + delta_provider: Option>, + /// Original connector params, retained so `register_object_stores` can + /// build the storage URL fragment understood by `SpiceObjectStoreRegistry`. + /// Present only in `delta_lake` mode. + storage_params: Option, } impl std::fmt::Debug for Databricks { @@ -313,9 +324,12 @@ impl Databricks { initialization, metrics, uc_client, + delta_provider: None, + storage_params: None, }) } "delta_lake" => { + let storage_params = params.clone(); let storage_options = params.to_secret_map(); let token_provider: Arc = match auth_credentials { AuthCredentials::Token(token) => { @@ -358,12 +372,15 @@ impl Databricks { token_provider, io_runtime, ); + let delta_provider = Arc::new(read_provider); Ok(Self { - read_provider: Arc::new(read_provider), + read_provider: Arc::clone(&delta_provider) as Arc, initialization, metrics: None, uc_client, + delta_provider: Some(delta_provider), + storage_params: Some(storage_params), }) } #[cfg(feature = "spark")] @@ -537,6 +554,8 @@ impl Databricks { initialization: ComponentInitialization::default(), metrics: None, uc_client: None, + delta_provider: None, + storage_params: None, }) } @@ -855,6 +874,78 @@ impl DataConnector for Databricks { }) as Arc }) } + + async fn register_object_stores( + &self, + dataset: &Dataset, + runtime_env: &Arc, + ) -> DataConnectorResult<()> { + // Only `delta_lake` mode produces object-store-backed scans on the + // executor. `sql_warehouse` and `spark_connect` execute on Databricks + // and surface as Flight/Arrow streams; nothing to register. + let (Some(delta), Some(params)) = (&self.delta_provider, &self.storage_params) else { + return Ok(()); + }; + + // Resolve the underlying storage location via Unity Catalog. This is + // the bare URL (e.g. `s3://databricks-workspace-stack-bfa88-bucket/...`) + // that DataFusion will look up in `runtime_env().object_store(url)` + // when executing the decoded `ParquetSource` on the executor. + let table_reference = TableReference::from(dataset.path()); + let storage_location = + delta + .resolve_table_uri(table_reference) + .await + .map_err(|source| DataConnectorError::UnableToConnectInternal { + dataconnector: "databricks".to_string(), + connector_component: ConnectorComponent::from(dataset), + source, + })?; + + let mut parsed = url::Url::parse(&storage_location).map_err(|source| { + DataConnectorError::UnableToConnectInternal { + dataconnector: "databricks".to_string(), + connector_component: ConnectorComponent::from(dataset), + source: Box::new(source), + } + })?; + + // Encode the connector's storage params as the URL fragment so + // `SpiceObjectStoreRegistry::get_store` can build the right object + // store. `storage_registry_params` returns just the AWS/Azure/GCS + // entries with their prefixed names rewritten to the registry's + // canonical names; Databricks-internal params (`endpoint`, `token`) + // are excluded. + let mut fragment_builder = url::form_urlencoded::Serializer::new(String::new()); + for (key, value) in params.storage_registry_params() { + fragment_builder.append_pair(&key, value.expose_secret()); + } + parsed.set_fragment(Some(fragment_builder.finish().as_str())); + + let listing_url = ListingTableUrl::parse(parsed).map_err(|source| { + DataConnectorError::UnableToConnectInternal { + dataconnector: "databricks".to_string(), + connector_component: ConnectorComponent::from(dataset), + source: Box::new(source), + } + })?; + + runtime_env.object_store(&listing_url).map_err(|source| { + DataConnectorError::UnableToConnectInternal { + dataconnector: "databricks".to_string(), + connector_component: ConnectorComponent::from(dataset), + source: Box::new(source), + } + })?; + + let mut redacted = >::as_ref(&listing_url).clone(); + redacted.set_fragment(None); + tracing::debug!( + "Configured object storage for Databricks Dataset {} ({redacted})", + dataset.name, + ); + Ok(()) + } } /// Classifies a table-provider error, promoting Databricks-specific @@ -1618,6 +1709,8 @@ mod tests { UnityCatalogClient::new(Endpoint(endpoint), None, None) .expect("mock Unity Catalog client should be created"), )), + delta_provider: None, + storage_params: None, }; let dataset = make_dataset(dataset_from, "tpch_sf400_part").await; diff --git a/crates/data-connectors/connector-odbc/src/lib.rs b/crates/data-connectors/connector-odbc/src/lib.rs index 631a2ab425..d87a772ced 100644 --- a/crates/data-connectors/connector-odbc/src/lib.rs +++ b/crates/data-connectors/connector-odbc/src/lib.rs @@ -99,6 +99,8 @@ fn databricks_dialect() -> CustomDialect { CustomDialectBuilder::new() .with_identifier_quote_style('`') .with_interval_style(IntervalStyle::MySQL) + .with_utf8_cast_dtype(datafusion::sql::sqlparser::ast::DataType::String(None)) + .with_large_utf8_cast_dtype(datafusion::sql::sqlparser::ast::DataType::String(None)) .build() } @@ -106,6 +108,7 @@ fn athena_dialect() -> CustomDialect { CustomDialectBuilder::new() .with_interval_style(IntervalStyle::MySQL) .with_date_field_extract_style(DateFieldExtractStyle::Extract) + .with_large_utf8_cast_dtype(datafusion::sql::sqlparser::ast::DataType::Varchar(None)) .build() } @@ -318,6 +321,29 @@ where #[cfg(test)] mod test { use super::*; + use datafusion::arrow::datatypes::DataType; + use datafusion::common::Result; + use datafusion::logical_expr::cast; + use datafusion::prelude::col; + use datafusion::sql::unparser::Unparser; + + #[test] + fn test_athena_dialect_overrides() -> Result<()> { + let dialect = athena_dialect(); + let unparser = Unparser::new(&dialect); + + // large_utf8_cast_dtype: VARCHAR instead of TEXT + let expr = cast(col("a"), DataType::LargeUtf8); + let actual = format!("{}", unparser.expr_to_sql(&expr)?); + assert_eq!(actual, "CAST(a AS VARCHAR)"); + + // utf8_cast_dtype: VARCHAR + let expr = cast(col("a"), DataType::Utf8); + let actual = format!("{}", unparser.expr_to_sql(&expr)?); + assert_eq!(actual, "CAST(a AS VARCHAR)"); + + Ok(()) + } #[test] fn test_odbc_driver_is_file() { diff --git a/crates/data_components/src/databricks/dialect.rs b/crates/data_components/src/databricks/dialect.rs index c247791ea2..2b8a4ac4b0 100644 --- a/crates/data_components/src/databricks/dialect.rs +++ b/crates/data_components/src/databricks/dialect.rs @@ -55,6 +55,16 @@ impl Dialect for DatabricksDialect { IntervalStyle::MySQL } + /// Databricks/Spark SQL uses STRING for UTF8 strings. + fn utf8_cast_dtype(&self) -> ast::DataType { + ast::DataType::String(None) + } + + /// Databricks/Spark SQL uses STRING for large UTF8 strings. + fn large_utf8_cast_dtype(&self) -> ast::DataType { + ast::DataType::String(None) + } + /// Override scalar functions to translate `DataFusion` functions to Spark SQL equivalents. fn scalar_function_to_sql_overrides( &self, @@ -147,6 +157,30 @@ mod tests { assert_eq!(dialect.identifier_quote_style("column_name"), Some('`')); } + #[test] + fn test_dialect_strings_overrides() -> datafusion::common::Result<()> { + let dialect = create_dialect(); + let unparser = Unparser::new(&dialect); + + // utf8_cast_dtype: STRING instead of VARCHAR + let expr = datafusion::logical_expr::cast( + datafusion::prelude::col("a"), + datafusion::arrow::datatypes::DataType::Utf8, + ); + let actual = format!("{}", unparser.expr_to_sql(&expr)?); + assert_eq!(actual, "CAST(`a` AS STRING)"); + + // large_utf8_cast_dtype: STRING instead of TEXT + let expr = datafusion::logical_expr::cast( + datafusion::prelude::col("a"), + datafusion::arrow::datatypes::DataType::LargeUtf8, + ); + let actual = format!("{}", unparser.expr_to_sql(&expr)?); + assert_eq!(actual, "CAST(`a` AS STRING)"); + + Ok(()) + } + #[test] fn test_interval_style() { let dialect = create_dialect(); diff --git a/crates/data_components/src/spark_connect/federation.rs b/crates/data_components/src/spark_connect/federation.rs index 05463a60d6..f26a3d4e80 100644 --- a/crates/data_components/src/spark_connect/federation.rs +++ b/crates/data_components/src/spark_connect/federation.rs @@ -70,6 +70,8 @@ impl SQLExecutor for SparkConnectTableProvider { CustomDialectBuilder::new() .with_interval_style(datafusion::sql::unparser::dialect::IntervalStyle::SQLStandard) .with_identifier_quote_style('`') + .with_utf8_cast_dtype(datafusion::sql::sqlparser::ast::DataType::String(None)) + .with_large_utf8_cast_dtype(datafusion::sql::sqlparser::ast::DataType::String(None)) .build(), ) } diff --git a/crates/runtime-parameters/src/lib.rs b/crates/runtime-parameters/src/lib.rs index 03cb34cf2c..3500386595 100644 --- a/crates/runtime-parameters/src/lib.rs +++ b/crates/runtime-parameters/src/lib.rs @@ -358,6 +358,30 @@ impl Parameters { self.params = params.into_iter().collect(); } + + /// Returns the subset of params that map to `SpiceObjectStoreRegistry` + /// configuration keys, with the prefixed names (`aws_*`, `azure_storage_*`, + /// `google_*`) rewritten to the registry-facing names (`key`, `secret`, + /// `account`, etc.). + /// + /// Connector-internal params that aren't part of an object store mapping + /// (e.g. a Databricks workspace `endpoint`/`token`) are excluded, so this + /// is safe to encode directly into an object-store URL fragment. + #[must_use] + pub fn storage_registry_params(&self) -> Vec<(String, SecretString)> { + let map: HashMap<_, _> = self.params.iter().cloned().collect(); + let mut out = Vec::new(); + for (prefixed_key, registry_key) in AWS_PREFIXED_FRAGMENT_PARAMS + .iter() + .chain(AZURE_PREFIXED_FRAGMENT_PARAMS.iter()) + .chain(GCS_PREFIXED_FRAGMENT_PARAMS.iter()) + { + if let Some(value) = map.get(*prefixed_key) { + out.push(((*registry_key).to_string(), value.clone())); + } + } + out + } } #[derive(Clone)] diff --git a/crates/runtime/src/cluster/mod.rs b/crates/runtime/src/cluster/mod.rs index 486742801e..36dec1581e 100644 --- a/crates/runtime/src/cluster/mod.rs +++ b/crates/runtime/src/cluster/mod.rs @@ -21,8 +21,6 @@ use crate::cluster::partition::{ scheduler_task::{PartitionManagementConfig, PartitionManagementTask}, }; use crate::config::{ClusterConfig, ClusterRole}; -use crate::dataconnector::listing; -use crate::dataconnector::parameters::ConnectorParamsBuilder; use crate::jobs::JobExecutor; use crate::status::ComponentStatus; use crate::{ @@ -55,7 +53,6 @@ use ballista_scheduler::scheduler_server::SchedulerServer; use ballista_scheduler::state::execution_graph::RunningTaskInfo; use datafusion::codec::spice_logical_codec::SpiceLogicalCodec; use datafusion::codec::spice_physical_codec::SpicePhysicalCodec; -use datafusion_datasource::ListingTableUrl; use datafusion_expr::Expr; use datafusion_proto::protobuf::{LogicalPlanNode, PhysicalPlanNode}; use futures::future::try_join_all; @@ -1803,8 +1800,15 @@ async fn executor_bind_app( Ok(()) } -/// Traverses dataset definitions and reifies `ListingTableUrl`s, triggering object store -/// registration for each. +/// For each registered dataset on the cluster executor, asks its data +/// connector to register any object stores it needs against the executor's +/// runtime env. +/// +/// On the executor, decoded `ParquetSource` (and other file-source) plans +/// arrive without their `parquet_file_reader_factory`, so `DataFusion` falls +/// back to `runtime_env().object_store(url)`. This function gives each +/// connector a chance to populate that registry using the dataset's +/// already-secret-expanded params. async fn executor_bind_object_stores(rt: Arc) -> crate::Result<()> { let app = rt.app(); let app = app.read().await; @@ -1813,66 +1817,31 @@ async fn executor_bind_object_stores(rt: Arc) -> crate::Result<()> { source: "Runtime did not bind an App.".into(), }); }; + let runtime_env = rt.df.ctx.runtime_env(); for dataset in Arc::clone(&rt).get_valid_datasets(app, LogErrors(true)) { - let mut params = ConnectorParamsBuilder::new(dataset.source().into(), (&dataset).into()) - .build(Arc::clone(&rt.secrets), rt.tokio_io_runtime()) + let connector = match Arc::clone(&rt) + .get_dataconnector_from_dataset(Arc::clone(&dataset)) .await - .context(FailedToStartClusterExecutorSnafu)?; - - // Either this is a URL with a scheme, or a URL with a connector name prefixing it - let url = match dataset.from.as_str().split_once(':') { - Some((_, rest)) if !rest.starts_with("//") => rest, - _ => dataset.from.as_str(), - }; - - let Ok(mut parsed) = Url::parse(url) else { - tracing::warn!("Unable to configure Dataset URL {}", url); - continue; + { + Ok(connector) => connector, + Err(error) => { + tracing::warn!( + "Skipping object store registration for dataset {}: {error}", + dataset.name + ); + continue; + } }; - if parsed.scheme() == "file" { + if let Err(error) = connector + .register_object_stores(&dataset, &runtime_env) + .await + { tracing::warn!( - "Dataset {} has a file:// scheme and may not be resolvable without a shared mount.", + "Failed to register object stores for dataset {}: {error}", dataset.name ); - continue; } - - // Not all connectors have the same parameter structures for S3 -- this makes all fragment - // keys match the spec expected by the S3 connector and `SpiceObjectRegistry`. - params.parameters.canonicalize_s3_fragments(); - - // Canonicalize Azure parameters (e.g., `azure_storage_account_name` -> `account`) - // for Delta Lake and other connectors that use Azure-prefixed parameter names. - params.parameters.canonicalize_azure_fragments(); - - // Canonicalize GCS parameters (e.g., `google_service_account` -> `service_account`) - // for Delta Lake and other connectors that use GCS-prefixed parameter names. - params.parameters.canonicalize_gcs_fragments(); - - let unprefixed = params - .parameters - .into_iter() - .map(|(k, _)| k.as_str()) - .collect::>(); - - parsed.set_fragment(Some( - listing::build_fragments(¶ms.parameters, unprefixed).as_str(), - )); - - let listing_table_url = ListingTableUrl::parse(parsed) - .boxed() - .context(FailedToStartClusterExecutorSnafu)?; - - let _ = rt - .df - .ctx - .runtime_env() - .object_store(listing_table_url) - .boxed() - .context(FailedToStartClusterExecutorSnafu)?; - - tracing::info!("Configured object storage for Dataset {}", dataset.name); } Ok(()) diff --git a/crates/runtime/src/dataconnector/listing/connector.rs b/crates/runtime/src/dataconnector/listing/connector.rs index f97532b820..a2160f186d 100644 --- a/crates/runtime/src/dataconnector/listing/connector.rs +++ b/crates/runtime/src/dataconnector/listing/connector.rs @@ -1217,6 +1217,47 @@ impl DataConnector for T { } } + async fn register_object_stores( + &self, + dataset: &Dataset, + runtime_env: &Arc, + ) -> DataConnectorResult<()> { + let url = self.get_object_store_url(dataset, None)?; + if url.scheme() == "file" { + tracing::warn!( + "Dataset {} has a file:// scheme and may not be resolvable on cluster executors without a shared mount.", + dataset.name + ); + return Ok(()); + } + + let listing_url = ListingTableUrl::parse(url).boxed().context( + crate::dataconnector::UnableToConnectInternalSnafu { + dataconnector: format!("{self}"), + connector_component: ConnectorComponent::from(dataset), + }, + )?; + + // Triggers SpiceObjectStoreRegistry::get_store, which builds an object + // store from the URL fragment params (already secret-expanded by + // ConnectorParamsBuilder when the connector was created) and registers + // it on the runtime env keyed by the bare URL. + runtime_env.object_store(&listing_url).boxed().context( + crate::dataconnector::UnableToConnectInternalSnafu { + dataconnector: format!("{self}"), + connector_component: ConnectorComponent::from(dataset), + }, + )?; + + let mut redacted = >::as_ref(&listing_url).clone(); + redacted.set_fragment(None); + tracing::debug!( + "Configured object storage for Dataset {} ({redacted})", + dataset.name, + ); + Ok(()) + } + /// A hook that is called when an accelerated table is registered to the /// `DataFusion` context for this data connector. /// diff --git a/crates/runtime/src/dataconnector/mod.rs b/crates/runtime/src/dataconnector/mod.rs index edf88ef40e..cd044faf5e 100644 --- a/crates/runtime/src/dataconnector/mod.rs +++ b/crates/runtime/src/dataconnector/mod.rs @@ -567,6 +567,27 @@ pub trait DataConnector: Debug + Send + Sync + 'static { None } + /// Pre-register any object stores this connector needs in order to execute + /// scans for `dataset` against the supplied `runtime_env`. + /// + /// Called on cluster executor startup so that physical plans decoded from + /// the scheduler can resolve their object stores via + /// `runtime_env().object_store(url)` even when the per-scan + /// `parquet_file_reader_factory` (or equivalent) is dropped during proto + /// round-trip. + /// + /// The default implementation is a no-op. Connectors backed by per-table + /// object stores (object-store-style connectors, Delta on S3/Azure/GCS, + /// Iceberg, etc.) should override this to register the appropriate stores + /// using the dataset's already secret-expanded params. + async fn register_object_stores( + &self, + _dataset: &Dataset, + _runtime_env: &Arc, + ) -> DataConnectorResult<()> { + Ok(()) + } + /// A hook that is called when an accelerated table is registered to the /// `DataFusion` context for this data connector. /// diff --git a/crates/spicepod/src/component/runtime.rs b/crates/spicepod/src/component/runtime.rs index 0c3a517a62..f1a0ca3fac 100644 --- a/crates/spicepod/src/component/runtime.rs +++ b/crates/spicepod/src/component/runtime.rs @@ -329,8 +329,31 @@ pub struct TelemetryConfig { pub enabled: bool, #[serde(default)] pub user_agent_collection: UserAgentCollection, + /// Custom key/value attributes attached to telemetry metrics emitted by + /// spiced. Applied as OpenTelemetry resource attributes on the runtime's + /// `MeterProvider`, so they appear as dimensions on every metric exported + /// via the Prometheus scrape endpoint, the cluster on-demand OTLP reader, + /// and the `otel_exporter` push exporter, and as labels on anonymous + /// usage telemetry. Currently does not affect tracing spans or logs. + /// Example: `{ environment: prod, region: us-west-2, team: data-platform }`. #[serde(default)] pub properties: HashMap, + /// Optional prefix prepended to every exported metric name. + /// + /// Useful for namespacing Spice metrics in shared backends (e.g. Datadog, + /// Grafana Cloud) so they don't collide with metrics from other services. + /// For example, with `metric_prefix: "spiceai."` the runtime metric + /// `query_duration_ms` is exported as `spiceai.query_duration_ms`. + /// + /// The prefix is applied via an `OpenTelemetry` `View` on the runtime's + /// `MeterProvider`, so it affects every metric reader attached to that + /// provider — the Prometheus scrape endpoint (`--metrics`), the cluster + /// on-demand OTLP reader, and the `otel_exporter` push reader. + /// `OpenTelemetry` 0.31's SDK does not support per-reader name transforms, + /// so this knob is intentionally placed at the telemetry level rather + /// than under any single exporter. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub metric_prefix: Option, /// Optional configuration for pushing metrics to an OpenTelemetry collector #[serde(default, skip_serializing_if = "Option::is_none")] pub otel_exporter: Option, @@ -342,6 +365,7 @@ impl Default for TelemetryConfig { enabled: true, user_agent_collection: UserAgentCollection::default(), properties: HashMap::new(), + metric_prefix: None, otel_exporter: None, } } @@ -1575,6 +1599,29 @@ mod tests { ); } + #[test] + fn test_metric_prefix_default_is_none() { + let yaml = r" + telemetry: + otel_exporter: + endpoint: otel-collector + "; + let runtime: Runtime = yaml::from_str(yaml).expect("Failed to parse Runtime"); + assert_eq!(runtime.telemetry.metric_prefix, None); + } + + #[test] + fn test_metric_prefix_parsing() { + let yaml = r#" + telemetry: + metric_prefix: "spiceai." + otel_exporter: + endpoint: otel-collector + "#; + let runtime: Runtime = yaml::from_str(yaml).expect("Failed to parse Runtime"); + assert_eq!(runtime.telemetry.metric_prefix.as_deref(), Some("spiceai.")); + } + #[test] fn test_otel_exporter_push_interval_duration_parsing() { let config = OtelExporterConfig {