Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
1fe8568
etl: Use ADBC bulk ingest for AdbcSink inserts
phillipleblanc Feb 19, 2026
070f0db
chore: auto-fix cargo fmt + clippy
github-actions[bot] Feb 19, 2026
106cc2e
feat: Add normalization for Utf8View columns in RecordBatch
lukekim Feb 19, 2026
f39ac38
chore: auto-fix cargo fmt + clippy
github-actions[bot] Feb 19, 2026
7d1387c
Merge branch 'trunk' into phillip/260219-adbc-bulk-ingest
lukekim Feb 20, 2026
565a806
ci: Add validation run job to PR workflow (Spice Cloud TPCH sf0.01)
lukekim Feb 20, 2026
0221bbb
Merge branch 'trunk' into phillip/260219-adbc-bulk-ingest
lukekim Feb 20, 2026
55562ce
Merge branch 'trunk' into phillip/260219-adbc-bulk-ingest
lukekim Feb 20, 2026
ea61199
ci: Add AWS credentials to spicebench step in validation run
lukekim Feb 20, 2026
c8723da
ci: Pass Iceberg and AWS env vars to spidapter container
lukekim Feb 20, 2026
4e11e3c
ci: Use SF 0.001 for validation run
lukekim Feb 20, 2026
7c5f7d4
fix: Chunk bulk ingest batches and add Utf8View/Utf8 schema equivalence
lukekim Feb 20, 2026
309d7ac
ci: Use SF 0.0001 for validation run
lukekim Feb 20, 2026
f471174
fix: Add retry with backoff for transient bulk ingest failures
lukekim Feb 20, 2026
17479ee
chore: auto-fix cargo fmt + clippy
github-actions[bot] Feb 20, 2026
88dcd83
perf: Use bind_stream for bulk ingest with 500K-row chunks
lukekim Feb 20, 2026
99e2064
fix: Restore retry with backoff for transient bulk ingest failures
lukekim Feb 20, 2026
1d8fe90
ci: Add concurrency group to ensure only one spicebench run at a time
lukekim Feb 20, 2026
21a55f5
Merge origin/trunk into phillip/260219-adbc-bulk-ingest
lukekim Feb 20, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 47 additions & 2 deletions crates/adbc_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ limitations under the License.
pub mod databricks;
pub mod spiceai;

pub use adbc_core::options::IngestMode;

use std::collections::HashMap;

use adbc_core::options::{AdbcVersion, OptionDatabase, OptionValue};
use adbc_core::{Connection, Database, Driver, LOAD_FLAG_DEFAULT, Statement};
use adbc_core::options::{self, AdbcVersion, OptionDatabase, OptionValue};
use adbc_core::{Connection, Database, Driver, Optionable, LOAD_FLAG_DEFAULT, Statement};
use adbc_driver_manager::ManagedDriver;
use arrow_array::RecordBatch;
use snafu::prelude::*;
Expand Down Expand Up @@ -118,4 +120,47 @@ impl AdbcConnection {
.collect::<std::result::Result<Vec<_>, _>>()
.context(ReadBatchSnafu)
}

/// Bulk-ingest a [`RecordBatch`] into a target table using the ADBC bulk
/// ingest API.
///
/// This binds the batch directly to a statement configured with the
/// target table and ingest mode, avoiding the overhead of constructing
/// individual SQL INSERT statements.
pub fn bulk_ingest(
&mut self,
target_table: &str,
target_db_schema: Option<&str>,
mode: options::IngestMode,
batch: RecordBatch,
) -> Result<Option<i64>> {
let mut stmt = self.conn.new_statement().map_err(|e| Error::ExecuteQuery {
reason: e.to_string(),
})?;

stmt.set_option(options::OptionStatement::TargetTable, OptionValue::from(target_table))
.map_err(|e| Error::ExecuteQuery {
reason: format!("Failed to set target table: {e}"),
})?;

if let Some(schema) = target_db_schema {
stmt.set_option(options::OptionStatement::TargetDbSchema, OptionValue::from(schema))
.map_err(|e| Error::ExecuteQuery {
reason: format!("Failed to set target db schema: {e}"),
})?;
}

stmt.set_option(options::OptionStatement::IngestMode, mode.into())
.map_err(|e| Error::ExecuteQuery {
reason: format!("Failed to set ingest mode: {e}"),
})?;

stmt.bind(batch).map_err(|e| Error::ExecuteQuery {
reason: format!("Failed to bind batch for bulk ingest: {e}"),
})?;

stmt.execute_update().map_err(|e| Error::ExecuteQuery {
reason: format!("Bulk ingest execution failed: {e}"),
})
}
}
216 changes: 66 additions & 150 deletions crates/etl/src/sink/adbc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,57 +14,42 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

use std::collections::HashSet;
use std::sync::{Arc, Mutex};

use adbc_client::AdbcConnection;
use adbc_client::{AdbcConnection, IngestMode};
use arrow::array::{
Array, ArrayRef, BooleanArray, Date32Array, Decimal128Array, Float32Array, Float64Array,
Int8Array, Int16Array, Int32Array, Int64Array, RecordBatch, StringArray, StringViewArray,
TimestampMicrosecondArray, UInt8Array, UInt16Array, UInt32Array, UInt64Array,
};
use arrow::datatypes::{DataType, Schema};
use arrow::datatypes::DataType;
use async_trait::async_trait;
use chrono::{Duration, NaiveDate};
use tokio::sync::Mutex as TokioMutex;

use super::{InsertOp, Sink};

const DEFAULT_INSERT_ROWS_PER_STATEMENT: usize = 2048;

/// ETL sink that writes transformed batches directly into the SUT via ADBC SQL.
/// ETL sink that writes transformed batches directly into the SUT via ADBC.
///
/// For `Insert` operations this sink uses the ADBC bulk ingest API, which
/// binds Arrow `RecordBatch` data directly to a statement – avoiding the
/// overhead of constructing individual SQL INSERT statements.
///
/// This sink appends rows with batched `INSERT INTO ... VALUES` statements.
/// Table auto-creation is optional and can be disabled when tables are managed
/// externally (for example by a system adapter RPC method).
/// Tables are expected to be pre-created (e.g. via a system adapter
/// `create_table` RPC), so the sink always uses [`IngestMode::Append`].
///
/// `Update` and `Delete` operations fall back to per-row SQL because
/// the ADBC bulk ingest API only supports append semantics.
pub struct AdbcSink {
conn: Arc<Mutex<AdbcConnection>>,
created_tables: TokioMutex<HashSet<String>>,
schema_name: Option<String>,
insert_rows_per_statement: usize,
auto_create_tables: bool,
}

impl AdbcSink {
#[must_use]
pub fn new(conn: AdbcConnection, schema_name: Option<String>) -> Self {
Self {
conn: Arc::new(Mutex::new(conn)),
created_tables: TokioMutex::new(HashSet::new()),
schema_name,
insert_rows_per_statement: DEFAULT_INSERT_ROWS_PER_STATEMENT,
auto_create_tables: true,
}
}

#[must_use]
pub fn new_without_table_creation(conn: AdbcConnection, schema_name: Option<String>) -> Self {
Self {
conn: Arc::new(Mutex::new(conn)),
created_tables: TokioMutex::new(HashSet::new()),
schema_name,
insert_rows_per_statement: DEFAULT_INSERT_ROWS_PER_STATEMENT,
auto_create_tables: false,
}
}

Expand All @@ -77,23 +62,6 @@ impl AdbcSink {
}
}

fn create_table_sql(&self, table_name: &str, schema: &Schema) -> anyhow::Result<String> {
let columns = schema
.fields()
.iter()
.map(|f| {
let col_type = sql_type_for_arrow(f.data_type())?;
Ok::<_, anyhow::Error>(format!("{} {col_type}", quote_identifier(f.name())))
})
.collect::<anyhow::Result<Vec<_>>>()?
.join(", ");

Ok(format!(
"CREATE TABLE IF NOT EXISTS {} ({columns})",
self.table_identifier(table_name)
))
}

async fn execute_sql_batch(&self, statements: Vec<String>) -> anyhow::Result<()> {
let conn = Arc::clone(&self.conn);
tokio::task::spawn_blocking(move || {
Expand All @@ -110,26 +78,29 @@ impl AdbcSink {
.await?
}

fn insert_sql_for_rows(
async fn bulk_ingest_batch(
&self,
table_name: &str,
batch: &RecordBatch,
row_range: std::ops::Range<usize>,
) -> anyhow::Result<String> {
let mut tuples = Vec::with_capacity(row_range.len());
for row_idx in row_range {
let mut values = Vec::with_capacity(batch.num_columns());
for (column, field) in batch.columns().iter().zip(batch.schema().fields()) {
values.push(sql_literal_for_value(column, field.data_type(), row_idx)?);
}
tuples.push(format!("({})", values.join(", ")));
}

Ok(format!(
"INSERT INTO {} VALUES {}",
self.table_identifier(table_name),
tuples.join(", ")
))
batch: RecordBatch,
) -> anyhow::Result<()> {
let conn = Arc::clone(&self.conn);
let target_table = table_name.to_string();
let target_schema = self.schema_name.clone();
tokio::task::spawn_blocking(move || {
let mut guard = conn
.lock()
.map_err(|e| anyhow::anyhow!("ADBC connection lock poisoned: {e}"))?;
guard
.bulk_ingest(
&target_table,
target_schema.as_deref(),
IngestMode::Append,
batch,
)
.map_err(|e| anyhow::anyhow!("ADBC bulk ingest failed: {e}"))?;
Ok::<_, anyhow::Error>(())
})
.await?
}
}

Expand All @@ -142,84 +113,46 @@ impl Sink for AdbcSink {
batch: RecordBatch,
op: InsertOp,
) -> anyhow::Result<()> {
let mut preamble_statements: Vec<String> = Vec::new();
let should_ensure_table = matches!(op, InsertOp::Insert | InsertOp::Update { .. });
let mut newly_created = false;
match op {
InsertOp::Insert => {
if batch.num_rows() == 0 {
return Ok(());
}

if should_ensure_table && self.auto_create_tables {
let created = self.created_tables.lock().await;
if !created.contains(table_name) {
preamble_statements.push(self.create_table_sql(table_name, &batch.schema())?);
newly_created = true;
self.bulk_ingest_batch(table_name, batch).await?;
}
}

let num_rows = batch.num_rows();
let mut dml_statements: Vec<String> = Vec::new();
if num_rows > 0 {
match &op {
InsertOp::Insert => {
let mut start = 0usize;
while start < num_rows {
let end = std::cmp::min(start + self.insert_rows_per_statement, num_rows);
dml_statements.push(self.insert_sql_for_rows(
table_name,
&batch,
start..end,
)?);
start = end;
}
}
InsertOp::Update { key_columns } => {
let key_indexes = key_column_indexes(&batch, key_columns)?;
for row_idx in 0..num_rows {
dml_statements.push(self.update_sql_for_row(
table_name,
&batch,
row_idx,
&key_indexes,
)?);
}
InsertOp::Update { ref key_columns } => {
if batch.num_rows() == 0 {
return Ok(());
}
InsertOp::Delete { key_columns } => {
let key_indexes = key_column_indexes(&batch, key_columns)?;
for row_idx in 0..num_rows {
dml_statements.push(self.delete_sql_for_row(
table_name,
&batch,
row_idx,
&key_indexes,
)?);
}
let key_indexes = key_column_indexes(&batch, key_columns)?;
let mut statements = Vec::new();
for row_idx in 0..batch.num_rows() {
statements.push(self.update_sql_for_row(
table_name,
&batch,
row_idx,
&key_indexes,
)?);
}
self.execute_sql_batch(statements).await?;
}
}

if dml_statements.is_empty() {
if !preamble_statements.is_empty() {
self.execute_sql_batch(preamble_statements).await?;
if newly_created {
let mut created = self.created_tables.lock().await;
created.insert(table_name.to_string());
InsertOp::Delete { ref key_columns } => {
if batch.num_rows() == 0 {
return Ok(());
}
let key_indexes = key_column_indexes(&batch, key_columns)?;
let mut statements = Vec::new();
for row_idx in 0..batch.num_rows() {
statements.push(self.delete_sql_for_row(
table_name,
&batch,
row_idx,
&key_indexes,
)?);
}
self.execute_sql_batch(statements).await?;
}
return Ok(());
}

let mut tx_statements = preamble_statements.clone();
tx_statements.push("BEGIN".to_string());
tx_statements.extend(dml_statements.clone());
tx_statements.push("COMMIT".to_string());

if self.execute_sql_batch(tx_statements).await.is_err() {
let mut fallback_statements = preamble_statements;
fallback_statements.extend(dml_statements);
self.execute_sql_batch(fallback_statements).await?;
}

if newly_created {
let mut created = self.created_tables.lock().await;
created.insert(table_name.to_string());
}

Ok(())
Expand Down Expand Up @@ -320,23 +253,6 @@ fn quote_string_literal(value: &str) -> String {
format!("'{}'", value.replace('\'', "''"))
}

fn sql_type_for_arrow(data_type: &DataType) -> anyhow::Result<String> {
match data_type {
DataType::Boolean => Ok("BOOLEAN".to_string()),
DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::UInt8 | DataType::UInt16 => {
Ok("INT".to_string())
}
DataType::Int64 | DataType::UInt32 | DataType::UInt64 => Ok("BIGINT".to_string()),
DataType::Float32 => Ok("FLOAT".to_string()),
DataType::Float64 => Ok("DOUBLE".to_string()),
DataType::Utf8 | DataType::LargeUtf8 => Ok("STRING".to_string()),
DataType::Date32 => Ok("DATE".to_string()),
DataType::Timestamp(_, _) => Ok("TIMESTAMP".to_string()),
DataType::Decimal128(p, s) => Ok(format!("DECIMAL({p}, {s})")),
other => anyhow::bail!("Unsupported Arrow data type for ADBC sink: {other:?}"),
}
}

fn sql_literal_for_value(
column: &ArrayRef,
data_type: &DataType,
Expand Down
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ async fn run_benchmark(
})?;
println!("ADBC connection established (driver: {})", driver_name);

let target = Arc::new(AdbcSink::new_without_table_creation(adbc_conn, None));
let target = Arc::new(AdbcSink::new(adbc_conn, None));

let dataset_source = DatasetSource::from_dataset_type(&version_metadata.dataset_type)?;
let generation_config = version_metadata.dataset_config();
Expand Down
Loading