Skip to content

Commit 2dcf253

Browse files
feat(cluster): connector-driven object store registration on executors (spiceai#10414)
1 parent debf255 commit 2dcf253

7 files changed

Lines changed: 209 additions & 59 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/data-connectors/connector-databricks/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ snafu.workspace = true
3838
token_provider = { path = "../../token_provider" }
3939
tokio.workspace = true
4040
tracing.workspace = true
41+
url.workspace = true
4142

4243
[features]
4344
default = []

crates/data-connectors/connector-databricks/src/lib.rs

Lines changed: 95 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ use data_components::unity_catalog::{
3535
};
3636
use data_components::{Read, RefreshableCatalogProvider};
3737
use datafusion::datasource::TableProvider;
38+
use datafusion::datasource::listing::ListingTableUrl;
39+
use datafusion::execution::runtime_env::RuntimeEnv;
3840
use datafusion::sql::TableReference;
3941
use opentelemetry::KeyValue;
4042
use runtime::Runtime;
@@ -53,7 +55,6 @@ use runtime::token_providers::databricks::{
5355
AuthCredentials, DatabricksM2MTokenProvider, DatabricksU2MTokenProvider,
5456
};
5557
use runtime_secrets::get_params_with_secrets;
56-
#[cfg(feature = "spark")]
5758
use secrecy::ExposeSecret;
5859
use secrecy::SecretString;
5960
use snafu::prelude::*;
@@ -222,6 +223,16 @@ pub struct Databricks {
222223
/// Unity Catalog client for table type detection and permission checking.
223224
/// Present when the connector was created with enough information to call UC APIs.
224225
uc_client: Option<Arc<UnityCatalogClient>>,
226+
/// Typed handle to the Delta read provider, present only in `delta_lake`
227+
/// mode. Used by `register_object_stores` to resolve table storage
228+
/// locations (which are only known after a UC round-trip) so the
229+
/// underlying object store can be registered on the cluster executor's
230+
/// runtime env.
231+
delta_provider: Option<Arc<DatabricksDelta>>,
232+
/// Original connector params, retained so `register_object_stores` can
233+
/// build the storage URL fragment understood by `SpiceObjectStoreRegistry`.
234+
/// Present only in `delta_lake` mode.
235+
storage_params: Option<Parameters>,
225236
}
226237

227238
impl std::fmt::Debug for Databricks {
@@ -313,9 +324,12 @@ impl Databricks {
313324
initialization,
314325
metrics,
315326
uc_client,
327+
delta_provider: None,
328+
storage_params: None,
316329
})
317330
}
318331
"delta_lake" => {
332+
let storage_params = params.clone();
319333
let storage_options = params.to_secret_map();
320334
let token_provider: Arc<dyn TokenProvider> = match auth_credentials {
321335
AuthCredentials::Token(token) => {
@@ -358,12 +372,15 @@ impl Databricks {
358372
token_provider,
359373
io_runtime,
360374
);
375+
let delta_provider = Arc::new(read_provider);
361376

362377
Ok(Self {
363-
read_provider: Arc::new(read_provider),
378+
read_provider: Arc::clone(&delta_provider) as Arc<dyn Read>,
364379
initialization,
365380
metrics: None,
366381
uc_client,
382+
delta_provider: Some(delta_provider),
383+
storage_params: Some(storage_params),
367384
})
368385
}
369386
#[cfg(feature = "spark")]
@@ -537,6 +554,8 @@ impl Databricks {
537554
initialization: ComponentInitialization::default(),
538555
metrics: None,
539556
uc_client: None,
557+
delta_provider: None,
558+
storage_params: None,
540559
})
541560
}
542561

@@ -855,6 +874,78 @@ impl DataConnector for Databricks {
855874
}) as Arc<dyn MetricsProvider>
856875
})
857876
}
877+
878+
async fn register_object_stores(
879+
&self,
880+
dataset: &Dataset,
881+
runtime_env: &Arc<RuntimeEnv>,
882+
) -> DataConnectorResult<()> {
883+
// Only `delta_lake` mode produces object-store-backed scans on the
884+
// executor. `sql_warehouse` and `spark_connect` execute on Databricks
885+
// and surface as Flight/Arrow streams; nothing to register.
886+
let (Some(delta), Some(params)) = (&self.delta_provider, &self.storage_params) else {
887+
return Ok(());
888+
};
889+
890+
// Resolve the underlying storage location via Unity Catalog. This is
891+
// the bare URL (e.g. `s3://databricks-workspace-stack-bfa88-bucket/...`)
892+
// that DataFusion will look up in `runtime_env().object_store(url)`
893+
// when executing the decoded `ParquetSource` on the executor.
894+
let table_reference = TableReference::from(dataset.path());
895+
let storage_location =
896+
delta
897+
.resolve_table_uri(table_reference)
898+
.await
899+
.map_err(|source| DataConnectorError::UnableToConnectInternal {
900+
dataconnector: "databricks".to_string(),
901+
connector_component: ConnectorComponent::from(dataset),
902+
source,
903+
})?;
904+
905+
let mut parsed = url::Url::parse(&storage_location).map_err(|source| {
906+
DataConnectorError::UnableToConnectInternal {
907+
dataconnector: "databricks".to_string(),
908+
connector_component: ConnectorComponent::from(dataset),
909+
source: Box::new(source),
910+
}
911+
})?;
912+
913+
// Encode the connector's storage params as the URL fragment so
914+
// `SpiceObjectStoreRegistry::get_store` can build the right object
915+
// store. `storage_registry_params` returns just the AWS/Azure/GCS
916+
// entries with their prefixed names rewritten to the registry's
917+
// canonical names; Databricks-internal params (`endpoint`, `token`)
918+
// are excluded.
919+
let mut fragment_builder = url::form_urlencoded::Serializer::new(String::new());
920+
for (key, value) in params.storage_registry_params() {
921+
fragment_builder.append_pair(&key, value.expose_secret());
922+
}
923+
parsed.set_fragment(Some(fragment_builder.finish().as_str()));
924+
925+
let listing_url = ListingTableUrl::parse(parsed).map_err(|source| {
926+
DataConnectorError::UnableToConnectInternal {
927+
dataconnector: "databricks".to_string(),
928+
connector_component: ConnectorComponent::from(dataset),
929+
source: Box::new(source),
930+
}
931+
})?;
932+
933+
runtime_env.object_store(&listing_url).map_err(|source| {
934+
DataConnectorError::UnableToConnectInternal {
935+
dataconnector: "databricks".to_string(),
936+
connector_component: ConnectorComponent::from(dataset),
937+
source: Box::new(source),
938+
}
939+
})?;
940+
941+
let mut redacted = <ListingTableUrl as AsRef<url::Url>>::as_ref(&listing_url).clone();
942+
redacted.set_fragment(None);
943+
tracing::debug!(
944+
"Configured object storage for Databricks Dataset {} ({redacted})",
945+
dataset.name,
946+
);
947+
Ok(())
948+
}
858949
}
859950

860951
/// Classifies a table-provider error, promoting Databricks-specific
@@ -1618,6 +1709,8 @@ mod tests {
16181709
UnityCatalogClient::new(Endpoint(endpoint), None, None)
16191710
.expect("mock Unity Catalog client should be created"),
16201711
)),
1712+
delta_provider: None,
1713+
storage_params: None,
16211714
};
16221715
let dataset = make_dataset(dataset_from, "tpch_sf400_part").await;
16231716

crates/runtime-parameters/src/lib.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,30 @@ impl Parameters {
358358

359359
self.params = params.into_iter().collect();
360360
}
361+
362+
/// Returns the subset of params that map to `SpiceObjectStoreRegistry`
363+
/// configuration keys, with the prefixed names (`aws_*`, `azure_storage_*`,
364+
/// `google_*`) rewritten to the registry-facing names (`key`, `secret`,
365+
/// `account`, etc.).
366+
///
367+
/// Connector-internal params that aren't part of an object store mapping
368+
/// (e.g. a Databricks workspace `endpoint`/`token`) are excluded, so this
369+
/// is safe to encode directly into an object-store URL fragment.
370+
#[must_use]
371+
pub fn storage_registry_params(&self) -> Vec<(String, SecretString)> {
372+
let map: HashMap<_, _> = self.params.iter().cloned().collect();
373+
let mut out = Vec::new();
374+
for (prefixed_key, registry_key) in AWS_PREFIXED_FRAGMENT_PARAMS
375+
.iter()
376+
.chain(AZURE_PREFIXED_FRAGMENT_PARAMS.iter())
377+
.chain(GCS_PREFIXED_FRAGMENT_PARAMS.iter())
378+
{
379+
if let Some(value) = map.get(*prefixed_key) {
380+
out.push(((*registry_key).to_string(), value.clone()));
381+
}
382+
}
383+
out
384+
}
361385
}
362386

363387
#[derive(Clone)]

crates/runtime/src/cluster/mod.rs

Lines changed: 26 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@ use crate::cluster::partition::{
2121
scheduler_task::{PartitionManagementConfig, PartitionManagementTask},
2222
};
2323
use crate::config::{ClusterConfig, ClusterRole};
24-
use crate::dataconnector::listing;
25-
use crate::dataconnector::parameters::ConnectorParamsBuilder;
2624
use crate::jobs::JobExecutor;
2725
use crate::status::ComponentStatus;
2826
use crate::{
@@ -55,7 +53,6 @@ use ballista_scheduler::scheduler_server::SchedulerServer;
5553
use ballista_scheduler::state::execution_graph::RunningTaskInfo;
5654
use datafusion::codec::spice_logical_codec::SpiceLogicalCodec;
5755
use datafusion::codec::spice_physical_codec::SpicePhysicalCodec;
58-
use datafusion_datasource::ListingTableUrl;
5956
use datafusion_expr::Expr;
6057
use datafusion_proto::protobuf::{LogicalPlanNode, PhysicalPlanNode};
6158
use futures::future::try_join_all;
@@ -1803,8 +1800,15 @@ async fn executor_bind_app(
18031800
Ok(())
18041801
}
18051802

1806-
/// Traverses dataset definitions and reifies `ListingTableUrl`s, triggering object store
1807-
/// registration for each.
1803+
/// For each registered dataset on the cluster executor, asks its data
1804+
/// connector to register any object stores it needs against the executor's
1805+
/// runtime env.
1806+
///
1807+
/// On the executor, decoded `ParquetSource` (and other file-source) plans
1808+
/// arrive without their `parquet_file_reader_factory`, so `DataFusion` falls
1809+
/// back to `runtime_env().object_store(url)`. This function gives each
1810+
/// connector a chance to populate that registry using the dataset's
1811+
/// already-secret-expanded params.
18081812
async fn executor_bind_object_stores(rt: Arc<Runtime>) -> crate::Result<()> {
18091813
let app = rt.app();
18101814
let app = app.read().await;
@@ -1813,66 +1817,31 @@ async fn executor_bind_object_stores(rt: Arc<Runtime>) -> crate::Result<()> {
18131817
source: "Runtime did not bind an App.".into(),
18141818
});
18151819
};
1820+
let runtime_env = rt.df.ctx.runtime_env();
18161821
for dataset in Arc::clone(&rt).get_valid_datasets(app, LogErrors(true)) {
1817-
let mut params = ConnectorParamsBuilder::new(dataset.source().into(), (&dataset).into())
1818-
.build(Arc::clone(&rt.secrets), rt.tokio_io_runtime())
1822+
let connector = match Arc::clone(&rt)
1823+
.get_dataconnector_from_dataset(Arc::clone(&dataset))
18191824
.await
1820-
.context(FailedToStartClusterExecutorSnafu)?;
1821-
1822-
// Either this is a URL with a scheme, or a URL with a connector name prefixing it
1823-
let url = match dataset.from.as_str().split_once(':') {
1824-
Some((_, rest)) if !rest.starts_with("//") => rest,
1825-
_ => dataset.from.as_str(),
1826-
};
1827-
1828-
let Ok(mut parsed) = Url::parse(url) else {
1829-
tracing::warn!("Unable to configure Dataset URL {}", url);
1830-
continue;
1825+
{
1826+
Ok(connector) => connector,
1827+
Err(error) => {
1828+
tracing::warn!(
1829+
"Skipping object store registration for dataset {}: {error}",
1830+
dataset.name
1831+
);
1832+
continue;
1833+
}
18311834
};
18321835

1833-
if parsed.scheme() == "file" {
1836+
if let Err(error) = connector
1837+
.register_object_stores(&dataset, &runtime_env)
1838+
.await
1839+
{
18341840
tracing::warn!(
1835-
"Dataset {} has a file:// scheme and may not be resolvable without a shared mount.",
1841+
"Failed to register object stores for dataset {}: {error}",
18361842
dataset.name
18371843
);
1838-
continue;
18391844
}
1840-
1841-
// Not all connectors have the same parameter structures for S3 -- this makes all fragment
1842-
// keys match the spec expected by the S3 connector and `SpiceObjectRegistry`.
1843-
params.parameters.canonicalize_s3_fragments();
1844-
1845-
// Canonicalize Azure parameters (e.g., `azure_storage_account_name` -> `account`)
1846-
// for Delta Lake and other connectors that use Azure-prefixed parameter names.
1847-
params.parameters.canonicalize_azure_fragments();
1848-
1849-
// Canonicalize GCS parameters (e.g., `google_service_account` -> `service_account`)
1850-
// for Delta Lake and other connectors that use GCS-prefixed parameter names.
1851-
params.parameters.canonicalize_gcs_fragments();
1852-
1853-
let unprefixed = params
1854-
.parameters
1855-
.into_iter()
1856-
.map(|(k, _)| k.as_str())
1857-
.collect::<Vec<_>>();
1858-
1859-
parsed.set_fragment(Some(
1860-
listing::build_fragments(&params.parameters, unprefixed).as_str(),
1861-
));
1862-
1863-
let listing_table_url = ListingTableUrl::parse(parsed)
1864-
.boxed()
1865-
.context(FailedToStartClusterExecutorSnafu)?;
1866-
1867-
let _ = rt
1868-
.df
1869-
.ctx
1870-
.runtime_env()
1871-
.object_store(listing_table_url)
1872-
.boxed()
1873-
.context(FailedToStartClusterExecutorSnafu)?;
1874-
1875-
tracing::info!("Configured object storage for Dataset {}", dataset.name);
18761845
}
18771846

18781847
Ok(())

crates/runtime/src/dataconnector/listing/connector.rs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1217,6 +1217,47 @@ impl<T: ListingTableConnector + Display> DataConnector for T {
12171217
}
12181218
}
12191219

1220+
async fn register_object_stores(
1221+
&self,
1222+
dataset: &Dataset,
1223+
runtime_env: &Arc<datafusion::execution::runtime_env::RuntimeEnv>,
1224+
) -> DataConnectorResult<()> {
1225+
let url = self.get_object_store_url(dataset, None)?;
1226+
if url.scheme() == "file" {
1227+
tracing::warn!(
1228+
"Dataset {} has a file:// scheme and may not be resolvable on cluster executors without a shared mount.",
1229+
dataset.name
1230+
);
1231+
return Ok(());
1232+
}
1233+
1234+
let listing_url = ListingTableUrl::parse(url).boxed().context(
1235+
crate::dataconnector::UnableToConnectInternalSnafu {
1236+
dataconnector: format!("{self}"),
1237+
connector_component: ConnectorComponent::from(dataset),
1238+
},
1239+
)?;
1240+
1241+
// Triggers SpiceObjectStoreRegistry::get_store, which builds an object
1242+
// store from the URL fragment params (already secret-expanded by
1243+
// ConnectorParamsBuilder when the connector was created) and registers
1244+
// it on the runtime env keyed by the bare URL.
1245+
runtime_env.object_store(&listing_url).boxed().context(
1246+
crate::dataconnector::UnableToConnectInternalSnafu {
1247+
dataconnector: format!("{self}"),
1248+
connector_component: ConnectorComponent::from(dataset),
1249+
},
1250+
)?;
1251+
1252+
let mut redacted = <ListingTableUrl as AsRef<url::Url>>::as_ref(&listing_url).clone();
1253+
redacted.set_fragment(None);
1254+
tracing::debug!(
1255+
"Configured object storage for Dataset {} ({redacted})",
1256+
dataset.name,
1257+
);
1258+
Ok(())
1259+
}
1260+
12201261
/// A hook that is called when an accelerated table is registered to the
12211262
/// `DataFusion` context for this data connector.
12221263
///

0 commit comments

Comments
 (0)