Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 60 additions & 2 deletions system-adapters/databricks/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -575,16 +575,43 @@ impl DatabricksAdapter {
}
}

/// Parse a `bucket(N, col_name)` expression and return the column name.
fn parse_bucket_column(spec: &str) -> Option<String> {
let trimmed = spec.trim();
if !trimmed.starts_with("bucket(") || !trimmed.ends_with(')') {
return None;
}
let inner = &trimmed["bucket(".len()..trimmed.len() - 1];
let (_n, col) = inner.split_once(',')?;
Some(col.trim().to_string())
}

fn create_table_ddl(
&self,
table_name: &str,
dataset_cfg: &DatasetConfig,
table_format: TableFormat,
) -> Result<String> {
// Separate partition columns into plain (Hive) and bucket (clustering).
let mut plain_partition_cols: Vec<&str> = Vec::new();
let mut cluster_cols: Vec<String> = Vec::new();

for spec in &dataset_cfg.partition_columns {
if let Some(col_name) = Self::parse_bucket_column(spec) {
cluster_cols.push(col_name);
} else {
plain_partition_cols.push(spec.as_str());
}
}

let partition_set: std::collections::HashSet<&str> =
plain_partition_cols.iter().copied().collect();

let columns = dataset_cfg
.schema
.fields()
.iter()
.filter(|field| !partition_set.contains(field.name().as_str()))
.map(|field| {
let col_type = Self::sql_type_for_arrow(field.data_type())?;
Ok::<_, anyhow::Error>(format!(
Expand All @@ -596,11 +623,42 @@ impl DatabricksAdapter {
.collect::<Result<Vec<_>>>()?
.join(", ");

Ok(format!(
let mut ddl = format!(
"CREATE TABLE {} ({columns}) USING {}",
self.table_full_name(table_name),
table_format.as_sql_using()
))
);

use std::fmt::Write;

if !plain_partition_cols.is_empty() {
let partition_defs = plain_partition_cols
.iter()
.map(|col_name| {
let field =
dataset_cfg.schema.field_with_name(col_name).map_err(|_| {
anyhow!("Partition column '{col_name}' not found in schema for table '{table_name}'")
})?;
let col_type = Self::sql_type_for_arrow(field.data_type())?;
Ok::<_, anyhow::Error>(format!(
"{} {col_type}",
Self::quoted_identifier(col_name)
))
})
.collect::<Result<Vec<_>>>()?
.join(", ");
write!(&mut ddl, " PARTITIONED BY ({partition_defs})").unwrap();
}

if !cluster_cols.is_empty() {
let cluster_idents: Vec<String> = cluster_cols
.iter()
.map(|c| Self::quoted_identifier(c))
.collect();
write!(&mut ddl, " CLUSTER BY ({})", cluster_idents.join(", ")).unwrap();
}

Ok(ddl)
}

fn table_format_from_setup_metadata(
Expand Down
Loading