Skip to content

Commit 0e7da19

Browse files
authored
Batch UPDATE statements using staging table and MERGE INTO for Databricks (#194)
1 parent ce183ba commit 0e7da19

2 files changed

Lines changed: 227 additions & 15 deletions

File tree

.github/workflows/run_spicebench.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,7 @@ jobs:
309309
310310
if [ "${SYSTEM_UNDER_TEST_PREFIX}" = "databricks" ]; then
311311
export SPICEBENCH_ADBC_DELETE_BATCH_SIZE=5000
312+
export SPICEBENCH_ADBC_UPDATE_STRATEGY=staging_table
312313
ADAPTER_CMD="${HOME}/.spice/bin/databricks-system-adapter"
313314
ADAPTER_ARGS="stdio"
314315
ADAPTER_ENVS="--system-adapter-env DATABRICKS_ENDPOINT=${DATABRICKS_ENDPOINT} --system-adapter-env DATABRICKS_TOKEN=${DATABRICKS_TOKEN} --system-adapter-env DATABRICKS_HTTP_PATH=${DATABRICKS_HTTP_PATH} --system-adapter-env DATABRICKS_SQL_WAREHOUSE_ID=${DATABRICKS_SQL_WAREHOUSE_ID} --system-adapter-env DATABRICKS_TABLE_FORMAT=${DATABRICKS_TABLE_FORMAT}"

crates/etl/src/sink/adbc.rs

Lines changed: 226 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,36 @@ const ADBC_SINK_POOL_SIZE_ENV: &str = "SPICEBENCH_ADBC_SINK_POOL_SIZE";
4242
/// When unset or empty, deletes use the original row-by-row approach.
4343
const ADBC_DELETE_BATCH_SIZE_ENV: &str = "SPICEBENCH_ADBC_DELETE_BATCH_SIZE";
4444

45+
/// Controls how UPDATE operations are executed.
46+
///
47+
/// - `statement` — row-by-row `UPDATE … SET … WHERE …` statements (default)
48+
/// - `staging_table` — bulk ingest into temp staging table + single `MERGE INTO`
49+
const ADBC_UPDATE_STRATEGY_ENV: &str = "SPICEBENCH_ADBC_UPDATE_STRATEGY";
50+
51+
/// Strategy for executing UPDATE operations.
52+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53+
enum UpdateStrategy {
54+
/// Row-by-row `UPDATE … SET … WHERE …` SQL statements.
55+
Statement,
56+
/// Bulk ingest into a temporary staging table, then `MERGE INTO target USING staging …`.
57+
StagingTable,
58+
}
59+
60+
impl UpdateStrategy {
61+
fn from_env() -> anyhow::Result<Self> {
62+
match std::env::var(ADBC_UPDATE_STRATEGY_ENV).ok().as_deref() {
63+
Some(val) => match val.to_lowercase().as_str() {
64+
"statement" => Ok(Self::Statement),
65+
"staging_table" => Ok(Self::StagingTable),
66+
other => anyhow::bail!(
67+
"Unknown update strategy '{other}'. Valid values for {ADBC_UPDATE_STRATEGY_ENV}: statement, staging_table"
68+
),
69+
},
70+
None => Ok(Self::Statement),
71+
}
72+
}
73+
}
74+
4575
/// ETL sink that writes transformed batches directly to an ADBC target.
4676
///
4777
/// Inserts use ADBC bulk ingest, while updates and deletes execute row-level
@@ -633,19 +663,124 @@ impl AdbcSink {
633663
Ok(statements)
634664
}
635665

636-
fn update_sql_statements(
666+
/// Perform an UPDATE via a temporary staging table:
667+
///
668+
/// 1. Bulk-ingest the update batch into a staging table.
669+
/// 2. `MERGE INTO target USING staging ON … WHEN MATCHED THEN UPDATE SET …`
670+
/// 3. `DROP TABLE staging`.
671+
fn staging_merge_update(
637672
&self,
673+
conn: &mut AdbcConnection,
638674
table_name: &str,
639-
batch: &RecordBatch,
675+
batch: RecordBatch,
640676
key_columns: &[String],
641-
) -> anyhow::Result<Vec<String>> {
677+
) -> anyhow::Result<()> {
642678
if key_columns.is_empty() {
643679
anyhow::bail!("Update requires at least one key column");
644680
}
645681

646-
(0..batch.num_rows())
647-
.map(|row| self.update_sql_for_row(table_name, batch, row, key_columns))
648-
.collect()
682+
let schema = batch.schema();
683+
let has_non_key = schema
684+
.fields()
685+
.iter()
686+
.any(|f| !key_columns.iter().any(|k| k == f.name()));
687+
if !has_non_key {
688+
anyhow::bail!(
689+
"Update requires at least one non-key column in batch schema for table '{table_name}'"
690+
);
691+
}
692+
693+
// Generate a unique staging table name.
694+
let ts = std::time::SystemTime::now()
695+
.duration_since(std::time::UNIX_EPOCH)
696+
.unwrap_or_default()
697+
.as_millis();
698+
let staging_table = format!("_spicebench_stg_{table_name}_{ts}");
699+
700+
// 1. Bulk-ingest batch into the staging table.
701+
if let Err(e) = self.ingest_insert_batch(conn, &staging_table, batch) {
702+
self.drop_staging_table(conn, &staging_table);
703+
return Err(e.context(format!(
704+
"Failed to ingest update data into staging table '{staging_table}'"
705+
)));
706+
}
707+
708+
// 2. MERGE INTO target from staging.
709+
let merge_sql = Self::build_staging_merge_sql(
710+
&self.target_table_identifier(table_name),
711+
&self.target_table_identifier(&staging_table),
712+
&schema,
713+
key_columns,
714+
self.identifier_quote_char,
715+
);
716+
let merge_result = conn
717+
.execute_update(&merge_sql)
718+
.map_err(|e| anyhow::anyhow!("MERGE INTO update failed for '{table_name}': {e}"));
719+
720+
// 3. Drop staging table (always, even on merge failure).
721+
self.drop_staging_table(conn, &staging_table);
722+
723+
merge_result?;
724+
Ok(())
725+
}
726+
727+
/// Best-effort drop of a staging table.
728+
fn drop_staging_table(&self, conn: &mut AdbcConnection, staging_table: &str) {
729+
let drop_sql = format!(
730+
"DROP TABLE IF EXISTS {}",
731+
self.target_table_identifier(staging_table)
732+
);
733+
734+
if let Err(e) = conn.execute_update(&drop_sql) {
735+
tracing::error!(
736+
staging_table = %staging_table,
737+
error = %e,
738+
"Failed to drop staging table. Manual cleanup may be required."
739+
);
740+
}
741+
}
742+
743+
/// Build a `MERGE INTO` statement that reads from a staging table.
744+
///
745+
/// ```sql
746+
/// MERGE INTO target t
747+
/// USING staging s
748+
/// ON t.key1 = s.key1
749+
/// WHEN MATCHED THEN UPDATE SET val1 = s.val1, val2 = s.val2
750+
/// ```
751+
fn build_staging_merge_sql(
752+
target_ident: &str,
753+
staging_ident: &str,
754+
schema: &Schema,
755+
key_columns: &[String],
756+
identifier_quote_char: char,
757+
) -> String {
758+
let on_clause: String = key_columns
759+
.iter()
760+
.map(|k| {
761+
let q = Self::quote_ident(k, identifier_quote_char);
762+
format!("t.{q} = s.{q}")
763+
})
764+
.collect::<Vec<_>>()
765+
.join(" AND ");
766+
767+
let set_clause: String = schema
768+
.fields()
769+
.iter()
770+
.filter(|f| !key_columns.iter().any(|k| k == f.name()))
771+
.map(|f| {
772+
let q = Self::quote_ident(f.name(), identifier_quote_char);
773+
format!("{q} = s.{q}")
774+
})
775+
.collect::<Vec<_>>()
776+
.join(", ");
777+
778+
format!(
779+
"MERGE INTO {target_ident} t \
780+
USING {staging_ident} s \
781+
ON {on_clause} \
782+
WHEN MATCHED THEN UPDATE SET {set_clause}"
783+
)
649784
}
650785
}
651786

@@ -697,24 +832,40 @@ impl Sink for AdbcSink {
697832
);
698833
}
699834
InsertOp::Update { key_columns } => {
700-
let statements = self.update_sql_statements(table_name, &batch, &key_columns)?;
701-
let num_statements = statements.len();
835+
let strategy = UpdateStrategy::from_env()?;
836+
let num_rows = batch.num_rows();
702837
let start = Instant::now();
703-
for sql in statements {
704-
conn.execute_update(&sql).map_err(|e| {
705-
anyhow::anyhow!("ADBC update execution failed for '{table_name}': {e}")
706-
})?;
838+
839+
match strategy {
840+
UpdateStrategy::StagingTable => {
841+
self.staging_merge_update(&mut conn, table_name, batch, &key_columns)?;
842+
}
843+
UpdateStrategy::Statement => {
844+
let statements: Vec<String> = (0..num_rows)
845+
.map(|row| {
846+
self.update_sql_for_row(table_name, &batch, row, &key_columns)
847+
})
848+
.collect::<anyhow::Result<Vec<_>>>()?;
849+
for sql in &statements {
850+
conn.execute_update(sql).map_err(|e| {
851+
anyhow::anyhow!(
852+
"ADBC update execution failed for '{table_name}': {e}"
853+
)
854+
})?;
855+
}
856+
}
707857
}
858+
708859
let elapsed = start.elapsed();
709860
let rows_per_sec = if elapsed.as_secs_f64() > 0.0 {
710-
batch.num_rows() as f64 / elapsed.as_secs_f64()
861+
num_rows as f64 / elapsed.as_secs_f64()
711862
} else {
712863
0.0
713864
};
714865
tracing::debug!(
715866
table = %table_name,
716-
rows = batch.num_rows(),
717-
num_statements,
867+
rows = num_rows,
868+
strategy = ?strategy,
718869
elapsed_ms = elapsed.as_millis(),
719870
rows_per_sec = format!("{rows_per_sec:.1}"),
720871
"UPDATE executed"
@@ -917,4 +1068,64 @@ mod tests {
9171068
.unwrap();
9181069
assert!(stmts.is_empty());
9191070
}
1071+
1072+
// ── STAGING MERGE UPDATE tests ───────────────────────────────────────
1073+
1074+
#[test]
1075+
fn staging_merge_sql_single_key() {
1076+
let schema = Schema::new(vec![
1077+
Field::new("id", DataType::Int64, false),
1078+
Field::new("name", DataType::Utf8, false),
1079+
Field::new("value", DataType::Float64, false),
1080+
]);
1081+
let key_columns = vec!["id".to_string()];
1082+
let sql = AdbcSink::build_staging_merge_sql(
1083+
r#""target""#,
1084+
r#""staging""#,
1085+
&schema,
1086+
&key_columns,
1087+
'"',
1088+
);
1089+
assert_eq!(
1090+
sql,
1091+
"MERGE INTO \"target\" t \
1092+
USING \"staging\" s \
1093+
ON t.\"id\" = s.\"id\" \
1094+
WHEN MATCHED THEN UPDATE SET \"name\" = s.\"name\", \"value\" = s.\"value\""
1095+
);
1096+
}
1097+
1098+
#[test]
1099+
fn staging_merge_sql_composite_key() {
1100+
let schema = Schema::new(vec![
1101+
Field::new("a", DataType::Int64, false),
1102+
Field::new("b", DataType::Int32, false),
1103+
Field::new("val", DataType::Utf8, false),
1104+
]);
1105+
let key_columns = vec!["a".to_string(), "b".to_string()];
1106+
let sql =
1107+
AdbcSink::build_staging_merge_sql(r#""t""#, r#""stg""#, &schema, &key_columns, '"');
1108+
assert!(sql.contains(r#"ON t."a" = s."a" AND t."b" = s."b""#));
1109+
assert!(sql.contains(r#"UPDATE SET "val" = s."val""#));
1110+
}
1111+
1112+
#[test]
1113+
fn staging_merge_sql_databricks_backticks() {
1114+
let schema = Schema::new(vec![
1115+
Field::new("id", DataType::Int64, false),
1116+
Field::new("name", DataType::Utf8, false),
1117+
]);
1118+
let key_columns = vec!["id".to_string()];
1119+
let sql = AdbcSink::build_staging_merge_sql(
1120+
"`catalog`.`schema`.`target`",
1121+
"`catalog`.`schema`.`staging`",
1122+
&schema,
1123+
&key_columns,
1124+
'`',
1125+
);
1126+
assert!(sql.starts_with("MERGE INTO `catalog`.`schema`.`target` t"));
1127+
assert!(sql.contains("USING `catalog`.`schema`.`staging` s"));
1128+
assert!(sql.contains("ON t.`id` = s.`id`"));
1129+
assert!(sql.contains("`name` = s.`name`"));
1130+
}
9201131
}

0 commit comments

Comments
 (0)