Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .schema/spicepod.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -887,6 +887,13 @@
"description": "Aggregation temporality preference for exported metrics.\n\nDefaults to `delta`, which is required by Datadog and recommended for\nmost push-based OTLP backends (`CloudWatch`, New Relic, etc.). Set to\n`cumulative` when forwarding to an `OTel` collector that feeds Prometheus\nor another cumulative-native backend. See [`OtelTemporality`] for\ndetails.",
"$ref": "#/$defs/OtelTemporality",
"default": "delta"
},
"prefix": {
"description": "Optional prefix prepended to every exported metric name.\n\nUseful for namespacing Spice metrics in shared backends so they don't\ncollide with metrics from other services. For example, with\n`prefix: \"spiceai.\"` the runtime metric `query_duration_ms` is exported\nas `spiceai.query_duration_ms`.\n\nThe prefix is applied via an `OpenTelemetry` `View` on the runtime's\n`MeterProvider`, so it affects every metric reader attached to that\nprovider (the `otel_exporter` push reader, the Prometheus scrape\nendpoint, and the cluster on-demand OTLP reader). In practice users\ntypically enable only one of these at a time, so the prefix appears on\nexactly the destination they configured it for.",
"type": [
"string",
"null"
]
}
},
"additionalProperties": false,
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Dockerfile.local
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM ubuntu:22.04
FROM ubuntu:24.04

RUN apt-get update \
&& apt-get install --yes --no-install-recommends ca-certificates libssl3 tzdata \
Expand Down
35 changes: 35 additions & 0 deletions bin/spiced/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -567,12 +567,15 @@ pub async fn run(args: Args) -> Result<()> {
std::collections::HashMap::new()
};

let metric_prefix = telemetry_config.get().and_then(|c| c.metric_prefix.clone());

init_metrics(
&rt.datafusion(),
prometheus_registry.clone(),
otel_config,
resolved_otel_headers,
metrics_reader,
metric_prefix,
)
.context(UnableToInitializeMetricsSnafu)?;
}
Expand Down Expand Up @@ -718,11 +721,43 @@ fn init_metrics(
otel_config: Option<&app::spicepod::component::runtime::OtelExporterConfig>,
resolved_otel_headers: std::collections::HashMap<String, String>,
metrics_reader: Option<runtime::metrics_reader::MetricsReader>,
metric_prefix: Option<String>,
) -> Result<(), Box<dyn std::error::Error>> {
let resource = Resource::builder().build();

let mut provider_builder = SdkMeterProvider::builder().with_resource(resource);

// Optional metric name prefix (e.g. "spiceai.") configured under
// `runtime.telemetry.metric_prefix`. Applied via an OTel View on the
// MeterProvider, so the rename happens once at the SDK layer and is
// observed by every reader attached below (Prometheus scrape, cluster
// on-demand OTLP, OTEL push). The prefix is intentionally placed at the
// telemetry level rather than under any single exporter because
// OpenTelemetry 0.31's SDK does not support per-reader name transforms.
if let Some(prefix) = metric_prefix.filter(|p| !p.is_empty()) {
tracing::info!(prefix = %prefix, "OTEL metrics name prefix enabled");
provider_builder = provider_builder.with_view(
move |instrument: &opentelemetry_sdk::metrics::Instrument| {
let new_name = format!("{prefix}{}", instrument.name());
match opentelemetry_sdk::metrics::Stream::builder()
.with_name(new_name.clone())
.build()
{
Ok(stream) => Some(stream),
Err(e) => {
tracing::warn!(
instrument = %instrument.name(),
new_name = %new_name,
error = %e,
"Failed to apply OTEL metric prefix; instrument will keep its original name"
);
None
}
}
},
);
}

// Case 1: Prometheus scrape
if let Some(registry) = registry {
let prometheus_exporter = opentelemetry_prometheus::exporter()
Expand Down
1 change: 1 addition & 0 deletions crates/data-connectors/connector-databricks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ snafu.workspace = true
token_provider = { path = "../../token_provider" }
tokio.workspace = true
tracing.workspace = true
url.workspace = true

[features]
default = []
Expand Down
97 changes: 95 additions & 2 deletions crates/data-connectors/connector-databricks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ use data_components::unity_catalog::{
};
use data_components::{Read, RefreshableCatalogProvider};
use datafusion::datasource::TableProvider;
use datafusion::datasource::listing::ListingTableUrl;
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::sql::TableReference;
use opentelemetry::KeyValue;
use runtime::Runtime;
Expand All @@ -53,7 +55,6 @@ use runtime::token_providers::databricks::{
AuthCredentials, DatabricksM2MTokenProvider, DatabricksU2MTokenProvider,
};
use runtime_secrets::get_params_with_secrets;
#[cfg(feature = "spark")]
use secrecy::ExposeSecret;
use secrecy::SecretString;
use snafu::prelude::*;
Expand Down Expand Up @@ -222,6 +223,16 @@ pub struct Databricks {
/// Unity Catalog client for table type detection and permission checking.
/// Present when the connector was created with enough information to call UC APIs.
uc_client: Option<Arc<UnityCatalogClient>>,
/// Typed handle to the Delta read provider, present only in `delta_lake`
/// mode. Used by `register_object_stores` to resolve table storage
/// locations (which are only known after a UC round-trip) so the
/// underlying object store can be registered on the cluster executor's
/// runtime env.
delta_provider: Option<Arc<DatabricksDelta>>,
/// Original connector params, retained so `register_object_stores` can
/// build the storage URL fragment understood by `SpiceObjectStoreRegistry`.
/// Present only in `delta_lake` mode.
storage_params: Option<Parameters>,
}

impl std::fmt::Debug for Databricks {
Expand Down Expand Up @@ -313,9 +324,12 @@ impl Databricks {
initialization,
metrics,
uc_client,
delta_provider: None,
storage_params: None,
})
}
"delta_lake" => {
let storage_params = params.clone();
let storage_options = params.to_secret_map();
let token_provider: Arc<dyn TokenProvider> = match auth_credentials {
AuthCredentials::Token(token) => {
Expand Down Expand Up @@ -358,12 +372,15 @@ impl Databricks {
token_provider,
io_runtime,
);
let delta_provider = Arc::new(read_provider);

Ok(Self {
read_provider: Arc::new(read_provider),
read_provider: Arc::clone(&delta_provider) as Arc<dyn Read>,
initialization,
metrics: None,
uc_client,
delta_provider: Some(delta_provider),
storage_params: Some(storage_params),
})
}
#[cfg(feature = "spark")]
Expand Down Expand Up @@ -537,6 +554,8 @@ impl Databricks {
initialization: ComponentInitialization::default(),
metrics: None,
uc_client: None,
delta_provider: None,
storage_params: None,
})
}

Expand Down Expand Up @@ -855,6 +874,78 @@ impl DataConnector for Databricks {
}) as Arc<dyn MetricsProvider>
})
}

async fn register_object_stores(
&self,
dataset: &Dataset,
runtime_env: &Arc<RuntimeEnv>,
) -> DataConnectorResult<()> {
// Only `delta_lake` mode produces object-store-backed scans on the
// executor. `sql_warehouse` and `spark_connect` execute on Databricks
// and surface as Flight/Arrow streams; nothing to register.
let (Some(delta), Some(params)) = (&self.delta_provider, &self.storage_params) else {
return Ok(());
};

// Resolve the underlying storage location via Unity Catalog. This is
// the bare URL (e.g. `s3://databricks-workspace-stack-bfa88-bucket/...`)
// that DataFusion will look up in `runtime_env().object_store(url)`
// when executing the decoded `ParquetSource` on the executor.
let table_reference = TableReference::from(dataset.path());
let storage_location =
delta
.resolve_table_uri(table_reference)
.await
.map_err(|source| DataConnectorError::UnableToConnectInternal {
dataconnector: "databricks".to_string(),
connector_component: ConnectorComponent::from(dataset),
source,
})?;

let mut parsed = url::Url::parse(&storage_location).map_err(|source| {
DataConnectorError::UnableToConnectInternal {
dataconnector: "databricks".to_string(),
connector_component: ConnectorComponent::from(dataset),
source: Box::new(source),
}
})?;

// Encode the connector's storage params as the URL fragment so
// `SpiceObjectStoreRegistry::get_store` can build the right object
// store. `storage_registry_params` returns just the AWS/Azure/GCS
// entries with their prefixed names rewritten to the registry's
// canonical names; Databricks-internal params (`endpoint`, `token`)
// are excluded.
let mut fragment_builder = url::form_urlencoded::Serializer::new(String::new());
for (key, value) in params.storage_registry_params() {
fragment_builder.append_pair(&key, value.expose_secret());
}
parsed.set_fragment(Some(fragment_builder.finish().as_str()));

let listing_url = ListingTableUrl::parse(parsed).map_err(|source| {
DataConnectorError::UnableToConnectInternal {
dataconnector: "databricks".to_string(),
connector_component: ConnectorComponent::from(dataset),
source: Box::new(source),
}
})?;

runtime_env.object_store(&listing_url).map_err(|source| {
DataConnectorError::UnableToConnectInternal {
dataconnector: "databricks".to_string(),
connector_component: ConnectorComponent::from(dataset),
source: Box::new(source),
}
})?;

let mut redacted = <ListingTableUrl as AsRef<url::Url>>::as_ref(&listing_url).clone();
redacted.set_fragment(None);
tracing::debug!(
"Configured object storage for Databricks Dataset {} ({redacted})",
dataset.name,
);
Ok(())
}
}

/// Classifies a table-provider error, promoting Databricks-specific
Expand Down Expand Up @@ -1618,6 +1709,8 @@ mod tests {
UnityCatalogClient::new(Endpoint(endpoint), None, None)
.expect("mock Unity Catalog client should be created"),
)),
delta_provider: None,
storage_params: None,
};
let dataset = make_dataset(dataset_from, "tpch_sf400_part").await;

Expand Down
26 changes: 26 additions & 0 deletions crates/data-connectors/connector-odbc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,16 @@ fn databricks_dialect() -> CustomDialect {
CustomDialectBuilder::new()
.with_identifier_quote_style('`')
.with_interval_style(IntervalStyle::MySQL)
.with_utf8_cast_dtype(datafusion::sql::sqlparser::ast::DataType::String(None))
.with_large_utf8_cast_dtype(datafusion::sql::sqlparser::ast::DataType::String(None))
.build()
}

fn athena_dialect() -> CustomDialect {
CustomDialectBuilder::new()
.with_interval_style(IntervalStyle::MySQL)
.with_date_field_extract_style(DateFieldExtractStyle::Extract)
.with_large_utf8_cast_dtype(datafusion::sql::sqlparser::ast::DataType::Varchar(None))
.build()
}

Expand Down Expand Up @@ -318,6 +321,29 @@ where
#[cfg(test)]
mod test {
use super::*;
use datafusion::arrow::datatypes::DataType;
use datafusion::common::Result;
use datafusion::logical_expr::cast;
use datafusion::prelude::col;
use datafusion::sql::unparser::Unparser;

#[test]
fn test_athena_dialect_overrides() -> Result<()> {
let dialect = athena_dialect();
let unparser = Unparser::new(&dialect);

// large_utf8_cast_dtype: VARCHAR instead of TEXT
let expr = cast(col("a"), DataType::LargeUtf8);
let actual = format!("{}", unparser.expr_to_sql(&expr)?);
assert_eq!(actual, "CAST(a AS VARCHAR)");

// utf8_cast_dtype: VARCHAR
let expr = cast(col("a"), DataType::Utf8);
let actual = format!("{}", unparser.expr_to_sql(&expr)?);
assert_eq!(actual, "CAST(a AS VARCHAR)");

Ok(())
}

#[test]
fn test_odbc_driver_is_file() {
Expand Down
34 changes: 34 additions & 0 deletions crates/data_components/src/databricks/dialect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,16 @@ impl Dialect for DatabricksDialect {
IntervalStyle::MySQL
}

/// Databricks/Spark SQL uses STRING for UTF8 strings.
fn utf8_cast_dtype(&self) -> ast::DataType {
ast::DataType::String(None)
}

/// Databricks/Spark SQL uses STRING for large UTF8 strings.
fn large_utf8_cast_dtype(&self) -> ast::DataType {
ast::DataType::String(None)
}

/// Override scalar functions to translate `DataFusion` functions to Spark SQL equivalents.
fn scalar_function_to_sql_overrides(
&self,
Expand Down Expand Up @@ -147,6 +157,30 @@ mod tests {
assert_eq!(dialect.identifier_quote_style("column_name"), Some('`'));
}

#[test]
fn test_dialect_strings_overrides() -> datafusion::common::Result<()> {
let dialect = create_dialect();
let unparser = Unparser::new(&dialect);

// utf8_cast_dtype: STRING instead of VARCHAR
let expr = datafusion::logical_expr::cast(
datafusion::prelude::col("a"),
datafusion::arrow::datatypes::DataType::Utf8,
);
let actual = format!("{}", unparser.expr_to_sql(&expr)?);
assert_eq!(actual, "CAST(`a` AS STRING)");

// large_utf8_cast_dtype: STRING instead of TEXT
let expr = datafusion::logical_expr::cast(
datafusion::prelude::col("a"),
datafusion::arrow::datatypes::DataType::LargeUtf8,
);
let actual = format!("{}", unparser.expr_to_sql(&expr)?);
assert_eq!(actual, "CAST(`a` AS STRING)");

Ok(())
}

#[test]
fn test_interval_style() {
let dialect = create_dialect();
Expand Down
2 changes: 2 additions & 0 deletions crates/data_components/src/spark_connect/federation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ impl SQLExecutor for SparkConnectTableProvider {
CustomDialectBuilder::new()
.with_interval_style(datafusion::sql::unparser::dialect::IntervalStyle::SQLStandard)
.with_identifier_quote_style('`')
.with_utf8_cast_dtype(datafusion::sql::sqlparser::ast::DataType::String(None))
.with_large_utf8_cast_dtype(datafusion::sql::sqlparser::ast::DataType::String(None))
.build(),
)
}
Expand Down
24 changes: 24 additions & 0 deletions crates/runtime-parameters/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,30 @@ impl Parameters {

self.params = params.into_iter().collect();
}

/// Returns the subset of params that map to `SpiceObjectStoreRegistry`
/// configuration keys, with the prefixed names (`aws_*`, `azure_storage_*`,
/// `google_*`) rewritten to the registry-facing names (`key`, `secret`,
/// `account`, etc.).
///
/// Connector-internal params that aren't part of an object store mapping
/// (e.g. a Databricks workspace `endpoint`/`token`) are excluded, so this
/// is safe to encode directly into an object-store URL fragment.
#[must_use]
pub fn storage_registry_params(&self) -> Vec<(String, SecretString)> {
let map: HashMap<_, _> = self.params.iter().cloned().collect();
let mut out = Vec::new();
for (prefixed_key, registry_key) in AWS_PREFIXED_FRAGMENT_PARAMS
.iter()
.chain(AZURE_PREFIXED_FRAGMENT_PARAMS.iter())
.chain(GCS_PREFIXED_FRAGMENT_PARAMS.iter())
{
if let Some(value) = map.get(*prefixed_key) {
out.push(((*registry_key).to_string(), value.clone()));
}
}
out
}
}

#[derive(Clone)]
Expand Down
Loading
Loading