Skip to content

Commit 7902c7b

Browse files
phillipleblancgithub-actions[bot]lukekim
authored
etl: Use ADBC bulk ingest for AdbcSink inserts (#106)
* etl: Use ADBC bulk ingest for AdbcSink inserts Replace per-row SQL INSERT statement construction with the ADBC bulk ingest API (Statement::bind + execute_update with IngestMode::Append). This sends Arrow RecordBatch data directly to the driver without serializing each cell to a SQL literal, which is significantly faster for large batches. Tables are pre-created via the system adapter create_table RPC, so the sink always uses IngestMode::Append. The auto_create_tables flag and created_tables tracking are removed. Update and Delete operations continue to use per-row SQL as the ADBC bulk ingest API only supports append semantics. * chore: auto-fix cargo fmt + clippy * feat: Add normalization for Utf8View columns in RecordBatch * chore: auto-fix cargo fmt + clippy * ci: Add validation run job to PR workflow (Spice Cloud TPCH sf0.01) * ci: Add AWS credentials to spicebench step in validation run * ci: Pass Iceberg and AWS env vars to spidapter container * ci: Use SF 0.001 for validation run * fix: Chunk bulk ingest batches and add Utf8View/Utf8 schema equivalence * ci: Use SF 0.0001 for validation run * fix: Add retry with backoff for transient bulk ingest failures * chore: auto-fix cargo fmt + clippy * perf: Use bind_stream for bulk ingest with 500K-row chunks - Add bulk_ingest_stream() to AdbcConnection using bind_stream API to send all chunks in a single ADBC statement - Increase chunk size from 65K to 500K rows to reduce round trips - Single spawn_blocking + mutex acquisition per table instead of per chunk - Eliminates per-chunk retry overhead (stream handles batching internally) * fix: Restore retry with backoff for transient bulk ingest failures * ci: Add concurrency group to ensure only one spicebench run at a time --------- Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: Luke Kim <80174+lukekim@users.noreply.github.com>
1 parent 5849e1c commit 7902c7b

5 files changed

Lines changed: 325 additions & 82 deletions

File tree

.github/workflows/pr.yml

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,3 +84,79 @@ jobs:
8484
git add -A
8585
git commit -m "chore: auto-fix cargo fmt + clippy"
8686
git push
87+
88+
validation-run:
89+
name: Validation run (Spice Cloud TPCH sf0.001)
90+
runs-on: ubuntu-latest
91+
timeout-minutes: 30
92+
needs: changes
93+
if: needs.changes.outputs.rust == 'true'
94+
concurrency:
95+
group: spicebench-run
96+
cancel-in-progress: true
97+
steps:
98+
- uses: actions/checkout@v6
99+
100+
- uses: ./.github/actions/management-login
101+
with:
102+
client-id: ${{ secrets.SPICE_MANAGEMENT_CLIENT_ID }}
103+
client-secret: ${{ secrets.SPICE_MANAGEMENT_CLIENT_SECRET }}
104+
105+
- name: Log in to GHCR
106+
uses: docker/login-action@v3
107+
with:
108+
registry: ghcr.io
109+
username: ${{ github.actor }}
110+
password: ${{ secrets.GITHUB_TOKEN }}
111+
112+
- name: Pull spidapter image
113+
run: docker pull ghcr.io/spiceai/spidapter:latest
114+
115+
- uses: ./.github/actions/build-spicebench
116+
117+
- name: Build data-generation
118+
run: |
119+
mkdir -p ~/.spice/bin
120+
cargo build -p data-generation
121+
install -m 755 target/debug/data-generation ~/.spice/bin/data-generation
122+
123+
- name: Generate test data (sf 0.01)
124+
env:
125+
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
126+
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
127+
RUST_LOG: info
128+
run: |
129+
~/.spice/bin/data-generation run \
130+
--dataset tpch \
131+
--scenario tpch \
132+
--version 0 \
133+
--scale-factor 0.0001 \
134+
--bucket spiceai-public-datasets \
135+
--prefix pr-validation \
136+
--max-concurrency 4 \
137+
--region us-east-1
138+
139+
- name: Install ADBC driver
140+
run: |
141+
curl -LsSf https://dbc.columnar.tech/install.sh | sh
142+
dbc install flightsql
143+
144+
- name: Run spicebench
145+
env:
146+
SPICEAI_API_KEY: ${{ env.SPICEAI_API_KEY }}
147+
SPICE_CLOUD_API_URL: https://dev-api.spice.ai
148+
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
149+
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
150+
SPIDAPTER_ICEBERG_REGION: us-west-1
151+
SPIDAPTER_ICEBERG_CATALOG_FROM: iceberg:https://glue.us-west-1.amazonaws.com/iceberg/v1/catalogs/211125479522/namespaces
152+
RUST_LOG: info
153+
run: |
154+
~/.spice/bin/spicebench \
155+
--concurrency 1 \
156+
--scenario tpch \
157+
--etl-bucket spiceai-public-datasets \
158+
--etl-prefix pr-validation \
159+
--etl-version 0 \
160+
--etl-region us-east-1 \
161+
--system-adapter-stdio-cmd docker \
162+
--system-adapter-stdio-args "run -i -e SPICEAI_API_KEY -e SPICE_CLOUD_API_URL -e AWS_ACCESS_KEY_ID -e AWS_SECRET_ACCESS_KEY -e SPIDAPTER_ICEBERG_REGION -e SPIDAPTER_ICEBERG_CATALOG_FROM ghcr.io/spiceai/spidapter:latest stdio --verbose --channel nightly"

.github/workflows/run_spicebench.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ on:
3939
description: 'Version identifier for the data generation to read from'
4040
required: true
4141
type: string
42-
default: "1"
42+
default: '1'
4343
etl_region:
4444
description: 'AWS region for the ETL S3 bucket'
4545
required: false
@@ -70,6 +70,8 @@ jobs:
7070
name: Run spicebench
7171
runs-on: ubuntu-latest
7272
timeout-minutes: 600
73+
concurrency:
74+
group: spicebench-run
7375
steps:
7476
- uses: actions/checkout@v6
7577

crates/adbc_client/src/lib.rs

Lines changed: 76 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@ limitations under the License.
1717
pub mod databricks;
1818
pub mod spiceai;
1919

20+
pub use adbc_core::options::IngestMode;
21+
2022
use std::collections::HashMap;
2123

22-
use adbc_core::options::{AdbcVersion, OptionDatabase, OptionValue};
23-
use adbc_core::{Connection, Database, Driver, LOAD_FLAG_DEFAULT, Statement};
24+
use adbc_core::options::{self, AdbcVersion, OptionDatabase, OptionValue};
25+
use adbc_core::{Connection, Database, Driver, LOAD_FLAG_DEFAULT, Optionable, Statement};
2426
use adbc_driver_manager::ManagedDriver;
2527
use arrow_array::RecordBatch;
2628
use snafu::prelude::*;
@@ -118,4 +120,76 @@ impl AdbcConnection {
118120
.collect::<std::result::Result<Vec<_>, _>>()
119121
.context(ReadBatchSnafu)
120122
}
123+
124+
/// Bulk-ingest a [`RecordBatch`] into a target table using the ADBC bulk
125+
/// ingest API.
126+
///
127+
/// This binds the batch directly to a statement configured with the
128+
/// target table and ingest mode, avoiding the overhead of constructing
129+
/// individual SQL INSERT statements.
130+
pub fn bulk_ingest(
131+
&mut self,
132+
target_table: &str,
133+
target_db_schema: Option<&str>,
134+
mode: options::IngestMode,
135+
batch: RecordBatch,
136+
) -> Result<Option<i64>> {
137+
self.bulk_ingest_stream(
138+
target_table,
139+
target_db_schema,
140+
mode,
141+
Box::new(arrow_array::RecordBatchIterator::new(
142+
std::iter::once(Ok(batch.clone())),
143+
batch.schema(),
144+
)),
145+
)
146+
}
147+
148+
/// Bulk-ingest a stream of [`RecordBatch`]es into a target table using a
149+
/// single ADBC statement with `bind_stream`.
150+
///
151+
/// This is more efficient than calling [`bulk_ingest`](Self::bulk_ingest)
152+
/// per batch because it reuses the same statement and network connection.
153+
pub fn bulk_ingest_stream(
154+
&mut self,
155+
target_table: &str,
156+
target_db_schema: Option<&str>,
157+
mode: options::IngestMode,
158+
reader: Box<dyn arrow_array::RecordBatchReader + Send>,
159+
) -> Result<Option<i64>> {
160+
let mut stmt = self.conn.new_statement().map_err(|e| Error::ExecuteQuery {
161+
reason: e.to_string(),
162+
})?;
163+
164+
stmt.set_option(
165+
options::OptionStatement::TargetTable,
166+
OptionValue::from(target_table),
167+
)
168+
.map_err(|e| Error::ExecuteQuery {
169+
reason: format!("Failed to set target table: {e}"),
170+
})?;
171+
172+
if let Some(schema) = target_db_schema {
173+
stmt.set_option(
174+
options::OptionStatement::TargetDbSchema,
175+
OptionValue::from(schema),
176+
)
177+
.map_err(|e| Error::ExecuteQuery {
178+
reason: format!("Failed to set target db schema: {e}"),
179+
})?;
180+
}
181+
182+
stmt.set_option(options::OptionStatement::IngestMode, mode.into())
183+
.map_err(|e| Error::ExecuteQuery {
184+
reason: format!("Failed to set ingest mode: {e}"),
185+
})?;
186+
187+
stmt.bind_stream(reader).map_err(|e| Error::ExecuteQuery {
188+
reason: format!("Failed to bind stream for bulk ingest: {e}"),
189+
})?;
190+
191+
stmt.execute_update().map_err(|e| Error::ExecuteQuery {
192+
reason: format!("Bulk ingest execution failed: {e}"),
193+
})
194+
}
121195
}

0 commit comments

Comments
 (0)