Skip to content

Commit 1efa923

Browse files
committed
Improvements
1 parent 4441693 commit 1efa923

10 files changed

Lines changed: 166 additions & 41 deletions

File tree

.github/workflows/run_spicebench.yml

Lines changed: 46 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,15 @@ on:
1414
options:
1515
- tpch
1616
system_under_test:
17-
description: 'System under test (spice_cloud via spidapter docker image, or local databricks adapter modes)'
17+
description: 'System under test (spice_cloud via spidapter docker image, local databricks adapter modes, or postgres via spidapter local backend)'
1818
required: true
1919
default: spice_cloud
2020
type: choice
2121
options:
2222
- spice_cloud
2323
- databricks-sql
2424
- databricks-lakebase
25+
- postgres
2526
etl_type:
2627
description: 'ETL type'
2728
required: true
@@ -64,15 +65,15 @@ jobs:
6465
client-secret: ${{ secrets.SPICE_MANAGEMENT_CLIENT_SECRET_PROD }}
6566

6667
- name: Log in to GHCR
67-
if: ${{ env.SYSTEM_UNDER_TEST == 'spice_cloud' }}
68+
if: ${{ env.SYSTEM_UNDER_TEST == 'spice_cloud' || env.SYSTEM_UNDER_TEST == 'postgres' }}
6869
uses: docker/login-action@v3
6970
with:
7071
registry: ghcr.io
7172
username: ${{ github.actor }}
7273
password: ${{ secrets.GITHUB_TOKEN }}
7374

7475
- name: pull spidapter image
75-
if: ${{ env.SYSTEM_UNDER_TEST == 'spice_cloud' }}
76+
if: ${{ env.SYSTEM_UNDER_TEST == 'spice_cloud' || env.SYSTEM_UNDER_TEST == 'postgres' }}
7677
run: docker pull ghcr.io/spiceai/spidapter:latest
7778

7879
- uses: ./.github/actions/build-spicebench
@@ -112,6 +113,9 @@ jobs:
112113
DATABRICKS_SQL_WAREHOUSE_ID: ${{ secrets.DATABRICKS_SQL_WAREHOUSE_ID }}
113114
DATABRICKS_CATALOG: ${{ secrets.DATABRICKS_CATALOG }}
114115
DATABRICKS_SCHEMA: ${{ secrets.DATABRICKS_SCHEMA }}
116+
PG_HOST: ${{ secrets.POSTGRES_PG_HOST }}
117+
PG_USER: ${{ secrets.POSTGRES_PG_USER }}
118+
PG_DATABASE: ${{ vars.POSTGRES_PG_DATABASE }}
115119
run: |
116120
set -euo pipefail
117121
SYSTEM_UNDER_TEST_PREFIX="${SYSTEM_UNDER_TEST%%-*}"
@@ -165,6 +169,25 @@ jobs:
165169
"${HOME}/.spice/bin/databricks-system-adapter" --help >/dev/null
166170
;;
167171
172+
postgres)
173+
for required_var in PG_HOST PG_USER PG_DATABASE; do
174+
if [ -z "${!required_var:-}" ]; then
175+
echo "${required_var} must be set for postgres adapter mode"
176+
exit 1
177+
fi
178+
done
179+
180+
if ! command -v docker >/dev/null 2>&1; then
181+
echo "docker is required for postgres mode"
182+
exit 1
183+
fi
184+
185+
docker image inspect ghcr.io/spiceai/spidapter:latest >/dev/null 2>&1 || {
186+
echo "spidapter docker image not found locally; pull step may have failed"
187+
exit 1
188+
}
189+
;;
190+
168191
*)
169192
echo "Unsupported system_under_test value: ${SYSTEM_UNDER_TEST}"
170193
exit 1
@@ -196,7 +219,7 @@ jobs:
196219
sudo ldconfig
197220
198221
- name: Install ADBC Postgres driver
199-
if: ${{ startsWith(env.SYSTEM_UNDER_TEST, 'databricks-') }}
222+
if: ${{ startsWith(env.SYSTEM_UNDER_TEST, 'databricks-') || env.SYSTEM_UNDER_TEST == 'postgres' }}
200223
uses: columnar-tech/setup-dbc@v1
201224
with:
202225
drivers: postgresql
@@ -239,6 +262,11 @@ jobs:
239262
LAKEBASE_PG_SCHEMA: ${{ vars.LAKEBASE_PG_SCHEMA }}
240263
LAKEBASE_PROJECT: ${{ vars.LAKEBASE_PROJECT }}
241264
LAKEBASE_BRANCH: ${{ vars.LAKEBASE_BRANCH }}
265+
PG_HOST: ${{ vars.POSTGRES_PG_HOST }}
266+
PG_PORT: ${{ vars.POSTGRES_PG_PORT || '5432' }}
267+
PG_USER: ${{ secrets.POSTGRES_PG_USER }}
268+
PG_PASSWORD: ${{ secrets.POSTGRES_PG_PASSWORD }}
269+
PG_DATABASE: ${{ vars.POSTGRES_PG_DATABASE }}
242270
SPIDAPTER_ICEBERG_REGION: us-west-1
243271
SPIDAPTER_ICEBERG_CATALOG_FROM: iceberg:https://glue.us-west-1.amazonaws.com/iceberg/v1/catalogs/211125479522/namespaces
244272
SPIDAPTER_APP_MEMORY_LIMIT: '62Gi'
@@ -307,6 +335,20 @@ jobs:
307335
if [ -n "${DATABRICKS_STAGING_VOLUME_PATH:-}" ]; then
308336
ADAPTER_ENVS="${ADAPTER_ENVS} --system-adapter-env DATABRICKS_STAGING_VOLUME_PATH=${DATABRICKS_STAGING_VOLUME_PATH}"
309337
fi
338+
339+
if [ "${SYSTEM_UNDER_TEST}" = "databricks-lakebase" ]; then
340+
ADAPTER_ENVS="${ADAPTER_ENVS} --system-adapter-env DATABRICKS_COMPUTE_MODE=lakebase"
341+
ADAPTER_ENVS="${ADAPTER_ENVS} --system-adapter-env LAKEBASE_PG_HOST=${LAKEBASE_PG_HOST}"
342+
ADAPTER_ENVS="${ADAPTER_ENVS} --system-adapter-env LAKEBASE_PG_USER=${LAKEBASE_PG_USER}"
343+
ADAPTER_ENVS="${ADAPTER_ENVS} --system-adapter-env LAKEBASE_PG_SCHEMA=${LAKEBASE_PG_SCHEMA}"
344+
ADAPTER_ENVS="${ADAPTER_ENVS} --system-adapter-env LAKEBASE_PROJECT=${LAKEBASE_PROJECT}"
345+
ADAPTER_ENVS="${ADAPTER_ENVS} --system-adapter-env LAKEBASE_BRANCH=${LAKEBASE_BRANCH}"
346+
fi
347+
elif [ "${SYSTEM_UNDER_TEST_PREFIX}" = "postgres" ]; then
348+
export SPICEBENCH_ADBC_UPDATE_STRATEGY=statement
349+
ADAPTER_CMD="docker"
350+
ADAPTER_ARGS="run -i -e PG_HOST=${PG_HOST} -e PG_PORT=${PG_PORT} -e PG_USER=${PG_USER} -e PG_PASSWORD=${PG_PASSWORD} -e PG_DATABASE=${PG_DATABASE} ghcr.io/spiceai/spidapter:latest stdio --backend local --deployment-mode single-node"
351+
ADAPTER_ENVS=""
310352
else
311353
export SPICEBENCH_ADBC_UPDATE_STRATEGY=bulk_ingest_upsert
312354
export SPICEBENCH_ADBC_FLUSH_STREAM_BEFORE_UPSERT=true
@@ -326,15 +368,6 @@ jobs:
326368
ADAPTER_ENVS=""
327369
fi
328370
329-
if [ "${SYSTEM_UNDER_TEST}" = "databricks-lakebase" ]; then
330-
ADAPTER_ENVS="${ADAPTER_ENVS} --system-adapter-env DATABRICKS_COMPUTE_MODE=lakebase"
331-
ADAPTER_ENVS="${ADAPTER_ENVS} --system-adapter-env LAKEBASE_PG_HOST=${LAKEBASE_PG_HOST}"
332-
ADAPTER_ENVS="${ADAPTER_ENVS} --system-adapter-env LAKEBASE_PG_USER=${LAKEBASE_PG_USER}"
333-
ADAPTER_ENVS="${ADAPTER_ENVS} --system-adapter-env LAKEBASE_PG_SCHEMA=${LAKEBASE_PG_SCHEMA}"
334-
ADAPTER_ENVS="${ADAPTER_ENVS} --system-adapter-env LAKEBASE_PROJECT=${LAKEBASE_PROJECT}"
335-
ADAPTER_ENVS="${ADAPTER_ENVS} --system-adapter-env LAKEBASE_BRANCH=${LAKEBASE_BRANCH}"
336-
fi
337-
338371
~/.spice/bin/spicebench run \
339372
--concurrency "${NUM_QUERY_CLIENTS}" \
340373
--scenario "${SCENARIO}" \

crates/adbc_client/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ impl AdbcConnection {
118118

119119
Ok(Self::new(
120120
conn,
121-
driver_name == "databricks",
121+
driver_name == "databricks" || driver_name.eq_ignore_ascii_case("dynamodb"),
122122
driver_name == "postgresql",
123123
driver_name == "postgresql" || driver_name == "databricks",
124124
))

crates/adbc_client/src/pool.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,9 +150,11 @@ pub fn create_pool(
150150
reason: e.to_string(),
151151
})?;
152152

153+
let downcast_utf8view =
154+
driver_name == "databricks" || driver_name.eq_ignore_ascii_case("dynamodb");
153155
let manager = AdbcConnectionManager::new(
154156
db,
155-
driver_name == "databricks",
157+
downcast_utf8view,
156158
driver_name == "postgresql",
157159
driver_name == "postgresql" || driver_name == "databricks",
158160
);

crates/etl/src/lib.rs

Lines changed: 40 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -917,7 +917,7 @@ async fn write_segments_for_batch(
917917
partition_columns,
918918
)
919919
.await
920-
.map_err(|e| format!("write {table_name} batch {batch_id}: {e}"))
920+
.map_err(|e| format!("write {table_name} batch {batch_id}: {e:#}"))
921921
});
922922
}
923923

@@ -1959,28 +1959,49 @@ async fn run_pipeline(
19591959
}
19601960

19611961
// Collect results from all concurrent table tasks in this step.
1962+
// Also watch for cancellation so a ctrl-c that arrives while tasks are
1963+
// blocked in a slow sink write (e.g. DynamoDB ADBC bulk ingest) doesn't
1964+
// leave the pipeline stuck waiting for the current step to finish.
19621965
let mut step_batch_count: u64 = 0;
19631966
let mut step_rows_count: u64 = 0;
1964-
while let Some(result) = join_set.join_next().await {
1965-
match result {
1966-
Ok(Ok((table_name, is_finished, consumed_work_units, rows_read))) => {
1967-
step_batch_count += consumed_work_units;
1968-
step_rows_count += rows_read;
1969-
if is_finished {
1970-
let mut state = work_state.lock().expect("work_state lock poisoned");
1971-
state.finished_tables.insert(table_name);
1972-
tables_finished_counter.fetch_add(1, Ordering::Relaxed);
1973-
}
1974-
}
1975-
Ok(Err(err_msg)) => {
1967+
loop {
1968+
tokio::select! {
1969+
biased;
1970+
() = cancel.cancelled() => {
1971+
join_set.abort_all();
19761972
progress_logger.abort();
1977-
return PipelineState::Stopped(StopReason::Error(err_msg));
1973+
return PipelineState::Stopped(StopReason::Cancelled);
19781974
}
1979-
Err(e) => {
1980-
progress_logger.abort();
1981-
return PipelineState::Stopped(StopReason::Error(format!(
1982-
"Task panicked: {e}"
1983-
)));
1975+
result = join_set.join_next() => {
1976+
let Some(result) = result else { break; };
1977+
match result {
1978+
Ok(Ok((table_name, is_finished, consumed_work_units, rows_read))) => {
1979+
step_batch_count += consumed_work_units;
1980+
step_rows_count += rows_read;
1981+
if is_finished {
1982+
let mut state = work_state.lock().expect("work_state lock poisoned");
1983+
state.finished_tables.insert(table_name);
1984+
tables_finished_counter.fetch_add(1, Ordering::Relaxed);
1985+
}
1986+
}
1987+
Ok(Err(err_msg)) => {
1988+
progress_logger.abort();
1989+
return PipelineState::Stopped(StopReason::Error(err_msg));
1990+
}
1991+
Err(e) if e.is_cancelled() => {
1992+
// Task was aborted (e.g. by abort_all above on a
1993+
// concurrent iteration); treat as a soft cancel.
1994+
join_set.abort_all();
1995+
progress_logger.abort();
1996+
return PipelineState::Stopped(StopReason::Cancelled);
1997+
}
1998+
Err(e) => {
1999+
progress_logger.abort();
2000+
return PipelineState::Stopped(StopReason::Error(format!(
2001+
"Task panicked: {e}"
2002+
)));
2003+
}
2004+
}
19842005
}
19852006
}
19862007
}

crates/etl/src/sink/adbc.rs

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,10 @@ pub struct AdbcSink {
178178
bulk_ingest_stream_buffer: usize,
179179
/// Optional system adapter client for staging table creation.
180180
staging_adapter: Option<(Arc<Mutex<SystemAdapterClient>>, Uuid)>,
181+
/// Optional mapping from logical dataset name to physical table name.
182+
/// When set, overrides the table name used for ADBC bulk ingest so the
183+
/// sink writes to the correct physical table (e.g. DynamoDB prefixed names).
184+
table_name_map: HashMap<String, String>,
181185
}
182186

183187
impl AdbcSink {
@@ -243,6 +247,7 @@ impl AdbcSink {
243247
target_db_catalog: Option<String>,
244248
target_db_schema: Option<String>,
245249
staging_adapter: Option<(Arc<Mutex<SystemAdapterClient>>, Uuid)>,
250+
table_name_map: HashMap<String, String>,
246251
) -> anyhow::Result<Self> {
247252
let update_strategy = UpdateStrategy::from_env()?;
248253
let pool_size = Self::pool_size();
@@ -278,6 +283,7 @@ impl AdbcSink {
278283
flush_stream_before_upsert,
279284
bulk_ingest_stream_buffer,
280285
staging_adapter,
286+
table_name_map,
281287
})
282288
}
283289

@@ -386,11 +392,32 @@ impl AdbcSink {
386392
};
387393

388394
for sub_batch in sub_batches {
389-
sender.send(sub_batch).await.map_err(|_| {
390-
anyhow::anyhow!(
391-
"Bulk ingest stream for table '{table_name}' is no longer available"
392-
)
393-
})?;
395+
if sender.send(sub_batch).await.is_err() {
396+
// Worker exited before receiving this batch. Remove the stream and
397+
// await the worker to surface the actual error rather than a generic
398+
// "no longer available" message.
399+
let stream = {
400+
let mut streams = self.bulk_ingest_streams.write().await;
401+
streams.remove(table_name)
402+
};
403+
let err = if let Some(stream) = stream {
404+
match stream.worker.await {
405+
Ok(Ok(())) => anyhow::anyhow!(
406+
"Bulk ingest worker for '{table_name}' exited without error but before all data was sent"
407+
),
408+
Ok(Err(worker_err)) => worker_err
409+
.context(format!("Bulk ingest worker for '{table_name}' failed")),
410+
Err(join_err) => anyhow::anyhow!(
411+
"Bulk ingest worker for '{table_name}' panicked: {join_err}"
412+
),
413+
}
414+
} else {
415+
anyhow::anyhow!(
416+
"Bulk ingest stream for table '{table_name}' is no longer available"
417+
)
418+
};
419+
return Err(err);
420+
}
394421
batches_sent.fetch_add(1, Ordering::Relaxed);
395422
}
396423

@@ -530,7 +557,10 @@ impl AdbcSink {
530557
}
531558

532559
fn target_table_ingest_name(&self, table_name: &str) -> String {
533-
table_name.to_string()
560+
self.table_name_map
561+
.get(table_name)
562+
.cloned()
563+
.unwrap_or_else(|| table_name.to_string())
534564
}
535565

536566
fn create_table_sql(

crates/system-adapter-protocol/src/lib.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,12 @@ pub struct SetupResponse {
220220
/// (optional). Used to benchmark the distributed (Ballista) query path.
221221
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
222222
pub endpoints: HashMap<String, HashMap<String, serde_json::Value>>,
223+
/// Optional mapping from logical dataset name to physical table name.
224+
/// When set, the ETL sink will write to the physical name instead of the
225+
/// logical dataset name (e.g. DynamoDB uses timestamped table name prefixes
226+
/// to avoid collisions between concurrent benchmark runs).
227+
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
228+
pub table_name_map: HashMap<String, String>,
223229
}
224230
/// Request to teardown a benchmark run
225231
///

crates/system-adapter-protocol/src/server.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,13 @@ impl<H: Handler> Server<H> {
263263
};
264264
Self::handler_response(
265265
self.handler
266-
.setup(req.run_id, req.metadata, req.datasets, req.etl_sink_type, req.seed_data)
266+
.setup(
267+
req.run_id,
268+
req.metadata,
269+
req.datasets,
270+
req.etl_sink_type,
271+
req.seed_data,
272+
)
267273
.await,
268274
id,
269275
)

src/commands/etl_cmd.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ pub async fn execute(args: &EtlArgs) -> anyhow::Result<()> {
139139
args.adbc_catalog.clone(),
140140
args.adbc_schema.clone(),
141141
None,
142+
std::collections::HashMap::new(),
142143
)?);
143144

144145
(

src/commands/run.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@ async fn run_benchmark(
173173
target_db_catalog,
174174
target_db_schema,
175175
Some((Arc::clone(&system_adapter_client), run_id)),
176+
setup_response.table_name_map.clone(),
176177
)?);
177178

178179
let mut pipeline = ETLPipeline::new(
@@ -428,8 +429,31 @@ fn make_zero_batch(
428429
DataType::Float64 => Arc::new(Float64Array::from(vec![0.0f64; n_rows])),
429430
DataType::Utf8 => Arc::new(StringArray::from(vec![""; n_rows])),
430431
DataType::LargeUtf8 => Arc::new(LargeStringArray::from(vec![""; n_rows])),
432+
DataType::Utf8View => Arc::new(StringViewArray::from(vec![""; n_rows])),
431433
DataType::Date32 => Arc::new(Date32Array::from(vec![0i32; n_rows])),
432434
DataType::Date64 => Arc::new(Date64Array::from(vec![0i64; n_rows])),
435+
DataType::Timestamp(unit, tz) => {
436+
use arrow::datatypes::TimeUnit;
437+
let arr: ArrayRef = match unit {
438+
TimeUnit::Second => Arc::new(
439+
arrow::array::TimestampSecondArray::from(vec![0i64; n_rows])
440+
.with_timezone_opt(tz.clone()),
441+
),
442+
TimeUnit::Millisecond => Arc::new(
443+
arrow::array::TimestampMillisecondArray::from(vec![0i64; n_rows])
444+
.with_timezone_opt(tz.clone()),
445+
),
446+
TimeUnit::Microsecond => Arc::new(
447+
arrow::array::TimestampMicrosecondArray::from(vec![0i64; n_rows])
448+
.with_timezone_opt(tz.clone()),
449+
),
450+
TimeUnit::Nanosecond => Arc::new(
451+
arrow::array::TimestampNanosecondArray::from(vec![0i64; n_rows])
452+
.with_timezone_opt(tz.clone()),
453+
),
454+
};
455+
arr
456+
}
433457
DataType::Decimal128(p, s) => Arc::new(
434458
Decimal128Array::from(vec![0i128; n_rows])
435459
.with_precision_and_scale(*p, *s)

system-adapters/databricks/src/main.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2140,6 +2140,7 @@ impl Handler for DatabricksAdapter {
21402140
HashMap::from([("uri".to_string(), Value::String(pg_uri))]),
21412141
)),
21422142
endpoints: HashMap::new(),
2143+
table_name_map: HashMap::new(),
21432144
})
21442145
}
21452146
// For other variants, return a single Databricks ADBC driver.
@@ -2155,6 +2156,7 @@ impl Handler for DatabricksAdapter {
21552156
catalog_namespace: Some(format!("{}.{}", self.config.catalog, self.config.schema)),
21562157
read_driver: None,
21572158
endpoints: HashMap::new(),
2159+
table_name_map: HashMap::new(),
21582160
}),
21592161
}
21602162
}

0 commit comments

Comments
 (0)