Skip to content

Commit 8fe6fb7

Browse files
fix: Flush ADBC sink before resuming ETL after checkpoint pause
When the ETL pipeline pauses at a checkpoint boundary for validation, the reusable ADBC bulk ingest streams (long-lived DoPut connections) sit idle for the duration of the validation window (often 10+ minutes). These connections can go stale or be closed by the server during this period. When the pipeline resumes via continue_pipeline(), it reuses the same AdbcSink with its stale streams. The next write attempt hangs indefinitely on the dead connection, causing the ETL to stall at step 0/9 with zero progress. Fix: call data_sink.flush() in continue_pipeline() before spawning the new run task. This closes all reusable bulk ingest streams so they are recreated fresh when the first write of the new phase arrives. This requires making continue_pipeline() async since flush() is async.
1 parent 7d1f49a commit 8fe6fb7

4 files changed

Lines changed: 12 additions & 6 deletions

File tree

.github/workflows/run_spicebench_debug_spice_cloud.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,7 @@ jobs:
177177
EXECUTOR_REPLICAS: ${{ github.event.inputs.executor_replicas || '4' }}
178178
SPIDAPTER_APP_MEMORY_LIMIT: ${{ github.event.inputs.app_memory_limit || '16Gi' }}
179179
SPIDAPTER_EXECUTOR_MEMORY_LIMIT: ${{ github.event.inputs.executor_memory_limit || '16Gi' }}
180+
SPIDAPTER_ORGANIZATION_TAG: 'spicehq'
180181
CUSTOM_IMAGE: ${{ github.event.inputs.custom_image || '' }}
181182
run: |
182183
set -euo pipefail
@@ -225,6 +226,7 @@ jobs:
225226
226227
ADAPTER_DOCKER_OPTS="${ADAPTER_DOCKER_OPTS} -e SPIDAPTER_APP_MEMORY_LIMIT=${SPIDAPTER_APP_MEMORY_LIMIT}"
227228
ADAPTER_DOCKER_OPTS="${ADAPTER_DOCKER_OPTS} -e SPIDAPTER_EXECUTOR_MEMORY_LIMIT=${SPIDAPTER_EXECUTOR_MEMORY_LIMIT}"
229+
ADAPTER_DOCKER_OPTS="${ADAPTER_DOCKER_OPTS} -e SPIDAPTER_ORGANIZATION_TAG"
228230
229231
# Parse custom image into registry, name, and tag for spidapter.
230232
# Expected format: registry/image:tag (e.g. ghcr.io/spiceai/spiceai-dev:spicebench-sf10)

crates/etl/src/lib.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1457,7 +1457,7 @@ impl ETLPipeline {
14571457
/// [`PipelineState::Paused`].
14581458
///
14591459
/// Returns an error if the pipeline is not in the [`Paused`] state.
1460-
pub fn continue_pipeline(&mut self) -> anyhow::Result<()> {
1460+
pub async fn continue_pipeline(&mut self) -> anyhow::Result<()> {
14611461
let current_state = self.state_rx.borrow().clone();
14621462
if current_state != PipelineState::Paused {
14631463
anyhow::bail!(
@@ -1476,6 +1476,12 @@ impl ETLPipeline {
14761476
handle.abort();
14771477
}
14781478

1479+
// Flush the sink to close any stale reusable connections (e.g. ADBC
1480+
// bulk ingest streams) that may have gone idle during the checkpoint
1481+
// validation window. Without this, resumed writes can hang on dead
1482+
// connections that were held open across the pause.
1483+
self.data_sink.flush().await?;
1484+
14791485
self.spawn_run_task(self.batch_budget);
14801486
Ok(())
14811487
}

src/commands/checkpoint.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,7 @@ async fn execute_duckdb(args: &CheckpointArgs) -> anyhow::Result<()> {
282282
.await?;
283283
checkpoint_idx += 1;
284284

285-
pipeline.continue_pipeline()?;
285+
pipeline.continue_pipeline().await?;
286286
}
287287
PipelineState::Stopped(StopReason::Completed) => {
288288
tracing::info!(

src/commands/load/mod.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -656,9 +656,7 @@ async fn run_checkpoint_validation(
656656
// Phase 0: validate table row counts first as a fast correctness probe.
657657
// Row count queries are cheap and immediately surface data loss/duplication
658658
// without waiting for expensive analytical queries to converge.
659-
println!(
660-
"Checkpoint {checkpoint_idx}: validating table row counts before probing queries",
661-
);
659+
println!("Checkpoint {checkpoint_idx}: validating table row counts before probing queries",);
662660
{
663661
let mut row_count_ticker = tokio::time::interval(probe_period);
664662
let mut row_count_attempt = 0u64;
@@ -1007,7 +1005,7 @@ pub(crate) async fn run(
10071005
}
10081006
}
10091007

1010-
if let Err(e) = etl_pipeline.continue_pipeline() {
1008+
if let Err(e) = etl_pipeline.continue_pipeline().await {
10111009
eprintln!("Failed to continue ETL pipeline after pause: {e}");
10121010
shutdown_token.cancel();
10131011
break Some(RunOutcome::PipelineFailure(format!("Failed to continue ETL pipeline: {e}")));

0 commit comments

Comments
 (0)