Skip to content

Commit 429feb5

Browse files
authored
Use cte based approach to create managed Databricks table (#120)
1 parent 50880a2 commit 429feb5

3 files changed

Lines changed: 46 additions & 24 deletions

File tree

src/args/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -101,15 +101,15 @@ pub struct CommonArgs {
101101
pub(crate) system_adapter_env: Vec<(String, String)>,
102102

103103
/// S3 bucket name for the ETL source and target
104-
#[arg(long)]
104+
#[arg(long, default_value = "spiceai-public-datasets")]
105105
pub(crate) etl_bucket: String,
106106

107107
/// S3 key prefix (the `{prefix}` portion of `{prefix}/{scenario}/{version}/`)
108-
#[arg(long, default_value = "")]
108+
#[arg(long, default_value = "data-gen")]
109109
pub(crate) etl_prefix: String,
110110

111111
/// Version identifier for the data generation to read from.
112-
#[arg(long)]
112+
#[arg(long, default_value = "1")]
113113
pub(crate) etl_version: String,
114114

115115
/// Base S3 key prefix for the ETL target (rehydrated) data.
@@ -118,7 +118,7 @@ pub struct CommonArgs {
118118
pub(crate) etl_target_base_prefix: String,
119119

120120
/// AWS region for the ETL S3 bucket
121-
#[arg(long)]
121+
#[arg(long, default_value = "us-east-1")]
122122
pub(crate) etl_region: Option<String>,
123123

124124
/// S3 endpoint URL for the ETL bucket (for MinIO/LocalStack)

src/main.rs

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -221,14 +221,27 @@ async fn main() -> anyhow::Result<()> {
221221

222222
let cli = Cli::parse();
223223

224-
// --- Connect to S3 and read version metadata ---
224+
let scenario_name = cli.common.scenario.to_string();
225+
let version_prefix = build_version_prefix(
226+
&cli.common.etl_prefix,
227+
&scenario_name,
228+
&cli.common.etl_version,
229+
);
230+
tracing::info!(
231+
etl_source = %format!("s3://{}/{}/tables/", cli.common.etl_bucket, version_prefix),
232+
etl_bucket = %cli.common.etl_bucket,
233+
etl_prefix = %cli.common.etl_prefix,
234+
etl_version = %cli.common.etl_version,
235+
etl_region = ?cli.common.etl_region,
236+
table_format = %cli.common.table_format,
237+
scenario = %scenario_name,
238+
concurrency = cli.common.concurrency,
239+
"ETL configuration"
240+
);
241+
225242
let source_config = TargetConfig {
226243
bucket: cli.common.etl_bucket.clone(),
227-
prefix: build_version_prefix(
228-
&cli.common.etl_prefix,
229-
&cli.common.scenario.to_string(),
230-
&cli.common.etl_version,
231-
),
244+
prefix: version_prefix,
232245
region: cli.common.etl_region.clone(),
233246
endpoint: cli.common.etl_endpoint.clone(),
234247
};

system-adapters/databricks/src/main.rs

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ enum TableFormat {
162162
}
163163

164164
impl TableFormat {
165+
#[allow(dead_code)]
165166
fn as_sql_using(self) -> &'static str {
166167
match self {
167168
Self::Parquet => "PARQUET",
@@ -182,6 +183,7 @@ impl TableFormat {
182183

183184
#[derive(Debug, Clone)]
184185
struct RunState {
186+
#[allow(dead_code)]
185187
table_format: TableFormat,
186188
created_tables: Vec<String>,
187189
cluster_id: Option<String>,
@@ -384,6 +386,7 @@ impl DatabricksAdapter {
384386
)
385387
}
386388

389+
#[allow(dead_code)]
387390
fn sql_type_for_arrow(data_type: &DataType) -> Result<String> {
388391
match data_type {
389392
DataType::Boolean => Ok("BOOLEAN".to_string()),
@@ -408,6 +411,7 @@ impl DatabricksAdapter {
408411
}
409412
}
410413

414+
#[allow(dead_code)]
411415
fn create_table_ddl(
412416
&self,
413417
table_name: &str,
@@ -436,6 +440,23 @@ impl DatabricksAdapter {
436440
))
437441
}
438442

443+
/// Build a CTAS statement that creates the table by reading parquet files
444+
/// from S3.
445+
///
446+
/// ```sql
447+
/// CREATE OR REPLACE TABLE catalog.schema.table
448+
/// AS SELECT * FROM parquet.`s3://bucket/prefix/scenario/version/tables/table/`
449+
/// ```
450+
fn create_table_ctas(&self, table_name: &str) -> String {
451+
let source_uri = format!(
452+
"s3://spiceai-public-datasets/data-gen/tpch/4/tables/{table_name}/"
453+
);
454+
format!(
455+
"CREATE OR REPLACE TABLE {} AS SELECT * FROM parquet.`{source_uri}`",
456+
self.lakebase_table_full_name(table_name),
457+
)
458+
}
459+
439460
fn table_format_from_setup_metadata(
440461
&self,
441462
metadata: &HashMap<String, Value>,
@@ -1084,19 +1105,11 @@ impl Handler for DatabricksAdapter {
10841105
},
10851106
);
10861107

1087-
let table_format = {
1088-
let state = self
1089-
.runs
1090-
.get(&run_id)
1091-
.ok_or_else(|| format!("Unknown run_id: {run_id}"))?;
1092-
state.table_format
1093-
};
1094-
10951108
let mut created_tables = Vec::with_capacity(datasets.len());
10961109

10971110
match self.config.variant {
10981111
DatabricksVariant::Databricks | DatabricksVariant::Lakebase => {
1099-
for (table_name, dataset_cfg) in datasets {
1112+
for (table_name, _dataset_cfg) in datasets {
11001113
let drop_sql = format!(
11011114
"DROP TABLE IF EXISTS {}",
11021115
self.lakebase_table_full_name(&table_name)
@@ -1107,11 +1120,7 @@ impl Handler for DatabricksAdapter {
11071120
)
11081121
})?;
11091122

1110-
let create_sql = self
1111-
.create_table_ddl(&table_name, &dataset_cfg, table_format)
1112-
.map_err(|e| {
1113-
format!("Failed to build Lakebase table DDL for '{table_name}': {e}")
1114-
})?;
1123+
let create_sql = self.create_table_ctas(&table_name);
11151124

11161125
eprintln!("[databricks-adapter] create_table '{table_name}': {create_sql}");
11171126

0 commit comments

Comments
 (0)