Skip to content

Commit f396029

Browse files
phillipleblancJeadie
authored andcommitted
feat: Add custom_image input to debug Spice Cloud workflow (#238)
* feat: Add custom_image input to debug Spice Cloud workflow Add a custom_image workflow input to run_spicebench_debug_spice_cloud that allows specifying a custom runtime container image (e.g. ghcr.io/spiceai/spiceai-dev:spicebench-sf10) instead of the default nightly image. When set, the image reference is parsed into registry, image name, and tag components and passed through to spidapter as SPIDAPTER_IMAGE_REGISTRY, SPIDAPTER_IMAGE_NAME, and SPIDAPTER_IMAGE_TAG env vars. The channel is automatically switched to internal. Also adds executor_memory_limit input and fixes NUM_QUERY_CLIENTS to match the main workflow (2 instead of 8). * fix: Run row count validation first in checkpoint validation Move table row count validation to run as the first phase (Phase 0) before the probe query. Row count queries are cheap SELECT COUNT(*) and immediately surface data loss or duplication without waiting for expensive analytical queries to converge. * chore: Add per-batch operation row count logging for data reconciliation Log insert/update/delete row counts for each batch written through write_segments_for_batch. This covers both the initialization phase and the main ETL run pipeline. Example log output: INFO etl: Writing segments for batch table=customer batch_id=5 segments=3 insert_rows=8192 update_rows=512 delete_rows=128 This allows post-hoc reconciliation: summing insert_rows - delete_rows per table should match the expected row count at each checkpoint. If there is a mismatch, the per-batch logs pinpoint which batch_id had unexpected operation counts.
1 parent 21f9a0c commit f396029

5 files changed

Lines changed: 208 additions & 32 deletions

File tree

.github/workflows/run_spicebench_debug_spice_cloud.yml

Lines changed: 58 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,11 @@ on:
4646
required: false
4747
default: 'latest'
4848
type: string
49+
num_query_clients:
50+
description: 'Number of concurrent query clients'
51+
required: false
52+
default: '2'
53+
type: string
4954
enable_module_debug_logging:
5055
description: 'Enable debug logs'
5156
required: false
@@ -61,11 +66,26 @@ on:
6166
required: false
6267
default: false
6368
type: boolean
69+
executor_replicas:
70+
description: 'Number of executor replicas'
71+
required: false
72+
default: '4'
73+
type: string
6474
app_memory_limit:
6575
description: 'Memory limit for the scheduler (app) pod (e.g. 16Gi, 20Gi)'
6676
required: false
6777
default: '16Gi'
6878
type: string
79+
executor_memory_limit:
80+
description: 'Memory limit for the executor pod (e.g. 16Gi, 62Gi)'
81+
required: false
82+
default: '16Gi'
83+
type: string
84+
custom_image:
85+
description: 'Custom runtime container image (e.g. ghcr.io/spiceai/spiceai-dev:spicebench-sf10). Overrides the default channel image. Requires the internal update channel on the target SCP environment.'
86+
required: false
87+
default: ''
88+
type: string
6989
jobs:
7090
run-spicebench:
7191
name: Run spicebench
@@ -134,7 +154,7 @@ jobs:
134154
SCENARIO: ${{ github.event.inputs.scenario || 'tpch' }}
135155
SYSTEM_UNDER_TEST: ${{ github.event.inputs.system_under_test || 'spice_cloud' }}
136156
SYSTEM_ADAPTER: ${{ github.event.inputs.system_under_test || 'spice_cloud' }}
137-
NUM_QUERY_CLIENTS: '8'
157+
NUM_QUERY_CLIENTS: ${{ github.event.inputs.num_query_clients || '2' }}
138158
ETL_BUCKET: 'spicebench'
139159
ETL_PREFIX: ${{ github.event.inputs.etl_type == 'changes' && 'data-gen-mutable' || 'data-gen' }}
140160
SCALE_FACTOR: ${{ github.event.inputs.scale_factor || '1' }}
@@ -154,7 +174,11 @@ jobs:
154174
SPIDAPTER_ICEBERG_CATALOG_FROM: iceberg:https://glue.us-west-1.amazonaws.com/iceberg/v1/catalogs/211125479522/namespaces
155175
DISABLE_TEARDOWN: ${{ github.event.inputs.disable_teardown || 'false' }}
156176
ENABLE_PVC: ${{ github.event.inputs.enable_pvc || 'false' }}
177+
EXECUTOR_REPLICAS: ${{ github.event.inputs.executor_replicas || '4' }}
157178
SPIDAPTER_APP_MEMORY_LIMIT: ${{ github.event.inputs.app_memory_limit || '16Gi' }}
179+
SPIDAPTER_EXECUTOR_MEMORY_LIMIT: ${{ github.event.inputs.executor_memory_limit || '16Gi' }}
180+
SPIDAPTER_ORGANIZATION_TAG: 'spicehq'
181+
CUSTOM_IMAGE: ${{ github.event.inputs.custom_image || '' }}
158182
run: |
159183
set -euo pipefail
160184
if [ "${ENABLE_MODULE_DEBUG_LOGGING}" = "true" ]; then
@@ -198,16 +222,47 @@ jobs:
198222
export SPICEBENCH_ADBC_FLUSH_STREAM_BEFORE_UPSERT=true
199223
export SPICEBENCH_ADBC_DELETE_BATCH_SIZE=50000
200224
ADAPTER_CMD="docker"
201-
ADAPTER_DOCKER_OPTS="run -i -e SPIDAPTER_EXECUTOR_REPLICAS=4 -e SPICEAI_API_KEY -e SPICE_CLOUD_API_URL -e AWS_ACCESS_KEY_ID=${S3_AWS_ACCESS_KEY_ID} -e AWS_SECRET_ACCESS_KEY=${S3_AWS_SECRET_ACCESS_KEY} -e SPIDAPTER_ICEBERG_REGION -e SPIDAPTER_ICEBERG_CATALOG_FROM -e SCHEDULER_STATE_LOCATION"
225+
ADAPTER_DOCKER_OPTS="run -i -e SPIDAPTER_EXECUTOR_REPLICAS=${EXECUTOR_REPLICAS} -e SPICEAI_API_KEY -e SPICE_CLOUD_API_URL -e AWS_ACCESS_KEY_ID=${S3_AWS_ACCESS_KEY_ID} -e AWS_SECRET_ACCESS_KEY=${S3_AWS_SECRET_ACCESS_KEY} -e SPIDAPTER_ICEBERG_REGION -e SPIDAPTER_ICEBERG_CATALOG_FROM -e SCHEDULER_STATE_LOCATION"
202226
203227
ADAPTER_DOCKER_OPTS="${ADAPTER_DOCKER_OPTS} -e SPIDAPTER_APP_MEMORY_LIMIT=${SPIDAPTER_APP_MEMORY_LIMIT}"
228+
ADAPTER_DOCKER_OPTS="${ADAPTER_DOCKER_OPTS} -e SPIDAPTER_EXECUTOR_MEMORY_LIMIT=${SPIDAPTER_EXECUTOR_MEMORY_LIMIT}"
229+
ADAPTER_DOCKER_OPTS="${ADAPTER_DOCKER_OPTS} -e SPIDAPTER_ORGANIZATION_TAG"
230+
231+
# Parse custom image into registry, name, and tag for spidapter.
232+
# Expected format: registry/image:tag (e.g. ghcr.io/spiceai/spiceai-dev:spicebench-sf10)
233+
# Parses as: registry=ghcr.io, image=spiceai/spiceai-dev, tag=spicebench-sf10
234+
if [ -n "${CUSTOM_IMAGE}" ]; then
235+
IMAGE_WITH_TAG="${CUSTOM_IMAGE}"
236+
if [[ "${IMAGE_WITH_TAG}" == *":"* ]]; then
237+
SPIDAPTER_IMAGE_TAG="${IMAGE_WITH_TAG##*:}"
238+
IMAGE_WITHOUT_TAG="${IMAGE_WITH_TAG%:*}"
239+
else
240+
SPIDAPTER_IMAGE_TAG=""
241+
IMAGE_WITHOUT_TAG="${IMAGE_WITH_TAG}"
242+
fi
243+
# Registry is the first path component (hostname), image is the rest
244+
SPIDAPTER_IMAGE_REGISTRY="${IMAGE_WITHOUT_TAG%%/*}"
245+
SPIDAPTER_IMAGE_NAME="${IMAGE_WITHOUT_TAG#*/}"
246+
247+
echo "Custom image: registry=${SPIDAPTER_IMAGE_REGISTRY}, name=${SPIDAPTER_IMAGE_NAME}, tag=${SPIDAPTER_IMAGE_TAG}"
248+
ADAPTER_DOCKER_OPTS="${ADAPTER_DOCKER_OPTS} -e SPIDAPTER_IMAGE_REGISTRY=${SPIDAPTER_IMAGE_REGISTRY} -e SPIDAPTER_IMAGE_NAME=${SPIDAPTER_IMAGE_NAME}"
249+
if [ -n "${SPIDAPTER_IMAGE_TAG}" ]; then
250+
ADAPTER_DOCKER_OPTS="${ADAPTER_DOCKER_OPTS} -e SPIDAPTER_IMAGE_TAG=${SPIDAPTER_IMAGE_TAG}"
251+
fi
252+
fi
204253
205254
if [ "${ENABLE_PVC}" = "true" ]; then
206255
echo "PVC enabled: app=3GB, executor=2GB"
207256
ADAPTER_DOCKER_OPTS="${ADAPTER_DOCKER_OPTS} -e SPIDAPTER_APP_STORAGE_SIZE_GB=3 -e SPIDAPTER_EXECUTOR_STORAGE_SIZE_GB=2 -e SPIDAPTER_CAYENNE_DATA_DIR=/data/data -e SPIDAPTER_CAYENNE_METADATA_DIR=/data/metadata"
208257
fi
209258
210-
ADAPTER_ARGS="${ADAPTER_DOCKER_OPTS} ghcr.io/spiceai/spidapter:${{ github.event.inputs.spidapter_version || 'latest' }} stdio --verbose --channel nightly"
259+
# Use internal channel when a custom image is specified, otherwise nightly.
260+
SPIDAPTER_CHANNEL="nightly"
261+
if [ -n "${CUSTOM_IMAGE}" ]; then
262+
SPIDAPTER_CHANNEL="internal"
263+
fi
264+
265+
ADAPTER_ARGS="${ADAPTER_DOCKER_OPTS} ghcr.io/spiceai/spidapter:${{ github.event.inputs.spidapter_version || 'latest' }} stdio --verbose --channel ${SPIDAPTER_CHANNEL}"
211266
ADAPTER_ENVS=""
212267
213268
NO_TEARDOWN_ARG=""

crates/data-generation/src/dataset/mod.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,31 @@ use crate::dataset::simple_sequence::SimpleSequenceDataset;
3131
use crate::dataset::tpch::TpchDataset;
3232
use crate::storage::DataStorage;
3333

34+
/// ETL mode derived from mutation ratios.
35+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
36+
pub enum EtlType {
37+
/// Append-only data (no updates/deletes).
38+
Events,
39+
/// Change-data mode (updates and/or deletes present).
40+
Changes,
41+
}
42+
43+
impl EtlType {
44+
#[must_use]
45+
pub fn as_str(self) -> &'static str {
46+
match self {
47+
Self::Events => "events",
48+
Self::Changes => "changes",
49+
}
50+
}
51+
}
52+
53+
impl std::fmt::Display for EtlType {
54+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55+
f.write_str(self.as_str())
56+
}
57+
}
58+
3459
/// Metadata about a table in a dataset.
3560
#[derive(Debug, Clone)]
3661
pub struct DatasetTable {
@@ -156,6 +181,15 @@ impl MutationConfig {
156181
delete_ratio,
157182
}
158183
}
184+
185+
#[must_use]
186+
pub fn etl_type(&self) -> EtlType {
187+
if self.update_ratio == 0.0 && self.delete_ratio == 0.0 {
188+
EtlType::Events
189+
} else {
190+
EtlType::Changes
191+
}
192+
}
159193
}
160194

161195
#[async_trait]

crates/data-generation/src/version.rs

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use serde::{Deserialize, Serialize};
2727
use serde_json::Value;
2828

2929
use crate::config::DatasetConfig;
30-
use crate::dataset::MutationConfig;
30+
use crate::dataset::{EtlType, MutationConfig};
3131

3232
/// Converts an Arrow [`SchemaRef`] to a JSON-compatible representation
3333
/// using Arrow's built-in IPC JSON serialization (the "Schema" portion
@@ -105,16 +105,9 @@ impl VersionMetadata {
105105
}
106106

107107
/// Returns the ETL type based on the mutation configuration.
108-
///
109-
/// - `"events"` — append-only data (no updates or deletes).
110-
/// - `"changes"` — data with mutations (updates and/or deletes).
111108
#[must_use]
112-
pub fn etl_type(&self) -> &'static str {
113-
if self.mutations.update_ratio == 0.0 && self.mutations.delete_ratio == 0.0 {
114-
"events"
115-
} else {
116-
"changes"
117-
}
109+
pub fn etl_type(&self) -> EtlType {
110+
self.mutation_config().etl_type()
118111
}
119112
}
120113

crates/etl/src/lib.rs

Lines changed: 70 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -666,13 +666,11 @@ fn strip_internal_columns(batch: &RecordBatch) -> anyhow::Result<RecordBatch> {
666666
fn batch_is_insert_only(batch: &RecordBatch, mutations: &MutationConfig) -> anyhow::Result<bool> {
667667
// Naive check: if the dataset was generated with non-zero update or delete
668668
// ratios, batches may contain `_op` values other than "c".
669-
if mutations.update_ratio == 0.0 && mutations.delete_ratio == 0.0 {
669+
if mutations.etl_type() != data_generation::dataset::EtlType::Changes {
670670
return Ok(true);
671671
}
672672

673-
let schema = batch.schema();
674-
675-
let op_idx = match schema.index_of("_op") {
673+
let op_idx = match batch.schema().index_of("_op") {
676674
Ok(idx) => idx,
677675
Err(_) => return Ok(true),
678676
};
@@ -1197,7 +1195,12 @@ impl ETLPipeline {
11971195
.map(|(name, table)| {
11981196
let schema =
11991197
schema_with_created_at(&schema_without_internal_columns(&table.schema));
1200-
let primary_key_columns = dataset.primary_key(&name);
1198+
let primary_key_columns =
1199+
if mutations.etl_type() == data_generation::dataset::EtlType::Changes {
1200+
dataset.primary_key(&name)
1201+
} else {
1202+
Vec::new()
1203+
};
12011204
let config = ProtocolDatasetConfig {
12021205
schema,
12031206
primary_key_columns,
@@ -2009,6 +2012,9 @@ mod tests {
20092012
use arrow::array::{RecordBatch, StringViewArray};
20102013
use arrow::datatypes::{DataType, Field, Schema};
20112014
use arrow::ipc::writer::StreamWriter;
2015+
use data_generation::config::DatasetConfig as GenerationDatasetConfig;
2016+
use data_generation::dataset::MutationConfig;
2017+
use data_generation::storage::file::FileStorage;
20122018
use std::sync::Arc;
20132019

20142020
/// Measures the IPC-serialized size of a [`RecordBatch`].
@@ -2090,4 +2096,63 @@ mod tests {
20902096
);
20912097
}
20922098
}
2099+
2100+
#[test]
2101+
fn create_tables_request_datasets_omits_primary_keys_for_events() {
2102+
let config = GenerationDatasetConfig {
2103+
dataset_type: "simple_sequence".to_string(),
2104+
scale_factor: 0.01,
2105+
num_steps: 1,
2106+
};
2107+
let tempdir = tempfile::tempdir().expect("tempdir");
2108+
let storage = Arc::new(FileStorage::new(tempdir.path().to_path_buf()));
2109+
let mutations = MutationConfig::new(0.0, 0.0);
2110+
2111+
let datasets = ETLPipeline::create_tables_request_datasets(
2112+
DatasetSource::SimpleSequence,
2113+
&config,
2114+
storage,
2115+
&mutations,
2116+
None,
2117+
)
2118+
.expect("create tables datasets");
2119+
2120+
let table_cfg = datasets
2121+
.get("integer_sequence")
2122+
.expect("integer_sequence config");
2123+
assert!(
2124+
table_cfg.primary_key_columns.is_empty(),
2125+
"events ETL should not provide primary keys"
2126+
);
2127+
}
2128+
2129+
#[test]
2130+
fn create_tables_request_datasets_includes_primary_keys_for_changes() {
2131+
let config = GenerationDatasetConfig {
2132+
dataset_type: "simple_sequence".to_string(),
2133+
scale_factor: 0.01,
2134+
num_steps: 1,
2135+
};
2136+
let tempdir = tempfile::tempdir().expect("tempdir");
2137+
let storage = Arc::new(FileStorage::new(tempdir.path().to_path_buf()));
2138+
let mutations = MutationConfig::new(0.1, 0.0);
2139+
2140+
let datasets = ETLPipeline::create_tables_request_datasets(
2141+
DatasetSource::SimpleSequence,
2142+
&config,
2143+
storage,
2144+
&mutations,
2145+
None,
2146+
)
2147+
.expect("create tables datasets");
2148+
2149+
let table_cfg = datasets
2150+
.get("integer_sequence")
2151+
.expect("integer_sequence config");
2152+
assert_eq!(
2153+
table_cfg.primary_key_columns,
2154+
vec!["id".to_string()],
2155+
"changes ETL should provide primary keys"
2156+
);
2157+
}
20932158
}

src/commands/load/mod.rs

Lines changed: 43 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -653,8 +653,48 @@ async fn run_checkpoint_validation(
653653
return CheckpointValidationResult::TimedOut;
654654
};
655655

656+
// Phase 0: validate table row counts first as a fast correctness probe.
657+
// Row count queries are cheap and immediately surface data loss/duplication
658+
// without waiting for expensive analytical queries to converge.
659+
println!("Checkpoint {checkpoint_idx}: validating table row counts before probing queries",);
660+
{
661+
let mut row_count_ticker = tokio::time::interval(probe_period);
662+
let mut row_count_attempt = 0u64;
663+
loop {
664+
if tokio::time::Instant::now() >= deadline {
665+
println!(
666+
"Checkpoint {checkpoint_idx}: row count validation timed out after {} attempts",
667+
row_count_attempt
668+
);
669+
return CheckpointValidationResult::TimedOut;
670+
}
671+
672+
tokio::select! {
673+
biased;
674+
_ = signal::ctrl_c() => return CheckpointValidationResult::Interrupted,
675+
_ = row_count_ticker.tick() => {}
676+
};
677+
678+
row_count_attempt += 1;
679+
if validate_checkpoint_table_row_counts(
680+
executor,
681+
expected_row_counts,
682+
checkpoint_idx,
683+
query_catalog_namespace,
684+
)
685+
.await
686+
{
687+
break;
688+
}
689+
690+
println!(
691+
"Checkpoint {checkpoint_idx}: row count validation attempt {row_count_attempt} failed, retrying",
692+
);
693+
}
694+
}
695+
656696
println!(
657-
"Checkpoint {checkpoint_idx}: probing '{}' every {}s",
697+
"Checkpoint {checkpoint_idx}: row counts passed, probing '{}' every {}s",
658698
probe_query.name,
659699
probe_period.as_secs()
660700
);
@@ -663,7 +703,7 @@ async fn run_checkpoint_validation(
663703
let mut probe_count = 0u64;
664704

665705
loop {
666-
// Phase 1: probe Q1 until it passes
706+
// Phase 1: probe query until it passes
667707
let e2e_send_time = match probe_until_pass(
668708
executor,
669709
probe_query,
@@ -677,7 +717,7 @@ async fn run_checkpoint_validation(
677717
{
678718
ProbeOutcome::Passed(send_time) => {
679719
println!(
680-
"Checkpoint {checkpoint_idx}: probe query '{}' passed, validating table row counts",
720+
"Checkpoint {checkpoint_idx}: probe query '{}' passed, validating full query set",
681721
probe_query.name
682722
);
683723
send_time
@@ -691,17 +731,6 @@ async fn run_checkpoint_validation(
691731
return CheckpointValidationResult::TimedOut;
692732
}
693733

694-
if !validate_checkpoint_table_row_counts(
695-
executor,
696-
expected_row_counts,
697-
checkpoint_idx,
698-
query_catalog_namespace,
699-
)
700-
.await
701-
{
702-
continue;
703-
}
704-
705734
if validate_full_query_set(
706735
executor,
707736
queries,

0 commit comments

Comments
 (0)