Skip to content

Commit 2c2c43c

Browse files
krinartlukekim
andauthored
Handle foreign table + Classic sql warehouse combination gracefully (spiceai#10318)
* Handle foreign table + Classic sql warehouse combination gracefully * fix: Improve error messages for Databricks connector and SQL Warehouse * fix: Improve error messages in Databricks SQL Warehouse connector * fix: Enhance error handling and messaging for Databricks SQL Warehouse integration * fix: Format code for better readability in Databricks error handling tests * feat: Add acceleration_options module with shared store functionality * fix: Update DDL handling by removing deprecated acceleration_options module and refactoring related code * Fix + tests * Fix + Tests * Fix * Apply suggestion from @lukekim * Apply suggestions from code review Co-authored-by: Luke Kim <80174+lukekim@users.noreply.github.com> * Revert workflow changes * Fix integration tests --------- Co-authored-by: Luke Kim <80174+lukekim@users.noreply.github.com>
1 parent 87bc2c7 commit 2c2c43c

6 files changed

Lines changed: 1028 additions & 83 deletions

File tree

crates/data-connectors/connector-databricks/src/lib.rs

Lines changed: 109 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ pub enum Error {
9797
UnableToConstructDatabricksSqlWarehouse { source: sql_warehouse::Error },
9898

9999
#[snafu(display(
100-
"Invalid `mode` value: '{value}'. Use 'delta_lake' or 'spark_connect'. For details, visit: https://spiceai.org/docs/components/data-connectors/databricks#parameters"
100+
"Invalid `mode` value: '{value}'. Valid modes are 'sql_warehouse', 'delta_lake', and 'spark_connect'. For details, visit: https://spiceai.org/docs/components/data-connectors/databricks#parameters"
101101
))]
102102
InvalidMode { value: String },
103103

@@ -107,12 +107,12 @@ pub enum Error {
107107
InvalidConfiguration { message: String },
108108

109109
#[snafu(display(
110-
"Failed to build Databricks connector: required component '{missing_component}' is missing. An unexpected error occurred. Report a bug to request support: https://github.com/spiceai/spiceai/issues"
110+
"Failed to build Databricks connector. An unexpected internal error occurred. Report a bug on GitHub: https://github.com/spiceai/spiceai/issues"
111111
))]
112112
UnableToBuild { missing_component: String },
113113

114114
#[snafu(display(
115-
"Failed to obtain Databricks service principal token for machine-to-machine authentication. {source}"
115+
"Failed to obtain Databricks authentication token. {source} Verify the service principal credentials are correctly configured. For details, visit: https://spiceai.org/docs/components/data-connectors/databricks#parameters"
116116
))]
117117
UnableToGetToken {
118118
source: Box<dyn std::error::Error + Send + Sync>,
@@ -841,11 +841,7 @@ impl DataConnector for Databricks {
841841
self.read_provider
842842
.table_provider(table_reference)
843843
.await
844-
.map_err(|source| DataConnectorError::UnableToGetReadProvider {
845-
dataconnector: "databricks".to_string(),
846-
connector_component: ConnectorComponent::from(dataset),
847-
source,
848-
})
844+
.map_err(|source| classify_table_provider_error(dataset, source))
849845
}
850846

851847
fn initialization(&self) -> ComponentInitialization {
@@ -861,6 +857,71 @@ impl DataConnector for Databricks {
861857
}
862858
}
863859

860+
/// Classifies a table-provider error, promoting Databricks-specific
861+
/// configuration failures (e.g. foreign tables on Classic SQL warehouses)
862+
/// into permanent, non-retriable errors so the runtime surfaces them
863+
/// immediately instead of retrying indefinitely.
864+
fn classify_table_provider_error(
865+
dataset: &Dataset,
866+
source: Box<dyn std::error::Error + Send + Sync>,
867+
) -> DataConnectorError {
868+
if let Some(message) = databricks_invalid_configuration_message(&*source) {
869+
return DataConnectorError::InvalidConfigurationNoSource {
870+
dataconnector: "databricks".to_string(),
871+
connector_component: ConnectorComponent::from(dataset),
872+
message,
873+
};
874+
}
875+
876+
if is_permission_denied_in_chain(&*source) {
877+
return DataConnectorError::InsufficientPermissions {
878+
dataconnector: "databricks".to_string(),
879+
connector_component: ConnectorComponent::from(dataset),
880+
source,
881+
};
882+
}
883+
884+
DataConnectorError::UnableToGetReadProvider {
885+
dataconnector: "databricks".to_string(),
886+
connector_component: ConnectorComponent::from(dataset),
887+
source,
888+
}
889+
}
890+
891+
/// Walks the error chain looking for permission-denied signals from the
892+
/// `discover_schema` path (e.g. `PERMISSION_DENIED` on `DESCRIBE TABLE`).
893+
fn is_permission_denied_in_chain(source: &(dyn std::error::Error + 'static)) -> bool {
894+
let mut current: Option<&(dyn std::error::Error + 'static)> = Some(source);
895+
while let Some(err) = current {
896+
let msg = err.to_string();
897+
if msg.contains("Access denied") || msg.contains("PERMISSION_DENIED") {
898+
return true;
899+
}
900+
current = err.source();
901+
}
902+
false
903+
}
904+
905+
fn databricks_invalid_configuration_message(
906+
source: &(dyn std::error::Error + 'static),
907+
) -> Option<String> {
908+
let mut current: Option<&(dyn std::error::Error + 'static)> = Some(source);
909+
while let Some(err) = current {
910+
if let Some(error) = err.downcast_ref::<sql_warehouse::Error>() {
911+
match error {
912+
sql_warehouse::Error::ForeignTableOnClassicWarehouse { .. }
913+
| sql_warehouse::Error::UnsupportedDataSource { .. } => {
914+
return Some(error.to_string());
915+
}
916+
_ => {}
917+
}
918+
}
919+
current = err.source();
920+
}
921+
922+
None
923+
}
924+
864925
// ============================================================================
865926
// Databricks Metrics Provider
866927
// ============================================================================
@@ -1573,6 +1634,46 @@ mod tests {
15731634
)
15741635
}
15751636

1637+
#[tokio::test]
1638+
async fn test_classify_table_provider_error_matches_typed_foreign_table_error_in_chain() {
1639+
let dataset =
1640+
make_dataset("databricks:catalog.schema.foreign_table", "foreign_table").await;
1641+
let source: Box<dyn std::error::Error + Send + Sync> =
1642+
Box::new(sql_warehouse::Error::ForeignTableOnClassicWarehouse {
1643+
dataset_name: "catalog.schema.foreign_table".to_string(),
1644+
message: "[UNSUPPORTED_DATA_SOURCE] foreign tables require Pro or Serverless"
1645+
.to_string(),
1646+
});
1647+
1648+
let err = classify_table_provider_error(&dataset, source);
1649+
1650+
match err {
1651+
DataConnectorError::InvalidConfigurationNoSource { message, .. } => {
1652+
assert!(
1653+
message.contains("Lakehouse Federation foreign table"),
1654+
"expected the typed Databricks error message: {message}"
1655+
);
1656+
}
1657+
other => panic!("unexpected error classification: {other}"),
1658+
}
1659+
}
1660+
1661+
#[tokio::test]
1662+
async fn test_classify_table_provider_error_does_not_match_plain_text_only() {
1663+
let dataset =
1664+
make_dataset("databricks:catalog.schema.foreign_table", "foreign_table").await;
1665+
let source: Box<dyn std::error::Error + Send + Sync> = Box::new(std::io::Error::other(
1666+
"Lakehouse Federation foreign table is unsupported on Classic SQL warehouses",
1667+
));
1668+
1669+
let err = classify_table_provider_error(&dataset, source);
1670+
1671+
assert!(
1672+
matches!(err, DataConnectorError::UnableToGetReadProvider { .. }),
1673+
"plain text matches should not be promoted without the typed Databricks error"
1674+
);
1675+
}
1676+
15761677
fn assert_request_seen(captured_requests: &[String], path_fragment: &str) {
15771678
assert!(
15781679
captured_requests

0 commit comments

Comments
 (0)