@@ -62,6 +62,13 @@ const ADBC_REUSE_BULK_INGEST_STREAMS_ENV: &str = "SPICEBENCH_ADBC_REUSE_BULK_ING
6262const DEFAULT_ADBC_BULK_INGEST_STREAM_BUFFER : usize = 1 ;
6363const ADBC_BULK_INGEST_STREAM_BUFFER_ENV : & str = "SPICEBENCH_ADBC_BULK_INGEST_STREAM_BUFFER" ;
6464
65+ /// When enabled and using `bulk_ingest_upsert`, closes the reused bulk ingest
66+ /// stream for the table before sending upsert data, then writes the upsert via
67+ /// a direct (independent) ingest call. This avoids mixing upsert and subsequent
68+ /// insert rows in the same stream, which can cause duplicate rows on targets
69+ /// that apply upsert semantics per-call.
70+ const ADBC_FLUSH_STREAM_BEFORE_UPSERT_ENV : & str = "SPICEBENCH_ADBC_FLUSH_STREAM_BEFORE_UPSERT" ;
71+
6572/// Controls how UPDATE operations are executed.
6673///
6774/// - `statement` — row-by-row `UPDATE … SET … WHERE …` statements (default)
@@ -113,10 +120,17 @@ struct TableBulkIngestStream {
113120 schema : std:: sync:: Arc < Schema > ,
114121 sender : mpsc:: Sender < RecordBatch > ,
115122 worker : JoinHandle < anyhow:: Result < ( ) > > ,
123+ batches_sent : std:: sync:: Arc < AtomicU64 > ,
116124}
117125
118126impl TableBulkIngestStream {
119127 async fn close_and_wait ( self , table_name : & str ) -> anyhow:: Result < ( ) > {
128+ let batches = self . batches_sent . load ( Ordering :: Relaxed ) ;
129+ tracing:: debug!(
130+ table = %table_name,
131+ batches_sent = batches,
132+ "Flushing bulk ingest stream"
133+ ) ;
120134 drop ( self . sender ) ;
121135 self . worker . await . map_err ( |e| {
122136 anyhow:: anyhow!( "Bulk ingest worker task join failed for table '{table_name}': {e}" )
@@ -158,6 +172,7 @@ pub struct AdbcSink {
158172 bigint_suffix : bool ,
159173 update_strategy : UpdateStrategy ,
160174 reuse_bulk_ingest_streams : bool ,
175+ flush_stream_before_upsert : bool ,
161176 bulk_ingest_stream_buffer : usize ,
162177}
163178
@@ -192,6 +207,20 @@ impl AdbcSink {
192207 . unwrap_or ( true )
193208 }
194209
210+ fn flush_stream_before_upsert ( ) -> bool {
211+ std:: env:: var ( ADBC_FLUSH_STREAM_BEFORE_UPSERT_ENV )
212+ . ok ( )
213+ . and_then ( |raw| {
214+ let val = raw. trim ( ) . to_ascii_lowercase ( ) ;
215+ match val. as_str ( ) {
216+ "1" | "true" | "yes" | "on" => Some ( true ) ,
217+ "0" | "false" | "no" | "off" => Some ( false ) ,
218+ _ => None ,
219+ }
220+ } )
221+ . unwrap_or ( false )
222+ }
223+
195224 fn bulk_ingest_stream_buffer ( ) -> usize {
196225 std:: env:: var ( ADBC_BULK_INGEST_STREAM_BUFFER_ENV )
197226 . ok ( )
@@ -216,12 +245,16 @@ impl AdbcSink {
216245 let identifier_quote_char = AdbcConnectionManager :: identifier_quote_style ( driver_name) ;
217246 let bigint_suffix = AdbcConnectionManager :: bigint_suffix ( driver_name) ;
218247 let reuse_bulk_ingest_streams = Self :: reuse_bulk_ingest_streams ( ) ;
248+ let flush_stream_before_upsert = Self :: flush_stream_before_upsert ( ) ;
219249 let bulk_ingest_stream_buffer = Self :: bulk_ingest_stream_buffer ( ) ;
220250
221251 if reuse_bulk_ingest_streams {
222252 eprintln ! (
223253 "[adbc] Reusable bulk ingest streams enabled (buffer size: {bulk_ingest_stream_buffer})"
224254 ) ;
255+ if update_strategy == UpdateStrategy :: BulkIngestUpsert {
256+ eprintln ! ( "[adbc] Flush stream before upsert: {flush_stream_before_upsert}" ) ;
257+ }
225258 }
226259
227260 Ok ( Self {
@@ -234,6 +267,7 @@ impl AdbcSink {
234267 bigint_suffix,
235268 update_strategy,
236269 reuse_bulk_ingest_streams,
270+ flush_stream_before_upsert,
237271 bulk_ingest_stream_buffer,
238272 } )
239273 }
@@ -297,6 +331,7 @@ impl AdbcSink {
297331 schema,
298332 sender,
299333 worker,
334+ batches_sent : std:: sync:: Arc :: new ( AtomicU64 :: new ( 0 ) ) ,
300335 }
301336 }
302337
@@ -311,26 +346,34 @@ impl AdbcSink {
311346 . map ( |b| b. schema ( ) )
312347 . ok_or_else ( || anyhow:: anyhow!( "Expected at least one insert batch" ) ) ?;
313348
314- let sender = {
349+ let ( sender, batches_sent ) = {
315350 let streams = self . bulk_ingest_streams . read ( ) . await ;
316351 streams. get ( table_name) . and_then ( |stream| {
317352 if stream. schema . as_ref ( ) == schema. as_ref ( ) && !stream. worker . is_finished ( ) {
318- Some ( stream. sender . clone ( ) )
353+ Some ( (
354+ stream. sender . clone ( ) ,
355+ std:: sync:: Arc :: clone ( & stream. batches_sent ) ,
356+ ) )
319357 } else {
320358 None
321359 }
322360 } )
323- } ;
361+ }
362+ . unzip ( ) ;
324363
325- let sender = if let Some ( sender) = sender {
326- sender
364+ let ( sender, batches_sent) = if let Some ( sender) = sender {
365+ let counter = batches_sent. ok_or_else ( || {
366+ anyhow:: anyhow!( "Bulk ingest stream state inconsistency: sender present but batches_sent counter missing for table '{table_name}'" )
367+ } ) ?;
368+ ( sender, counter)
327369 } else {
328370 self . end_bulk_ingest_stream_for_table ( table_name) . await ?;
329371 let mut streams = self . bulk_ingest_streams . write ( ) . await ;
330372 let stream = self . spawn_table_bulk_ingest_stream ( table_name, schema. clone ( ) ) ;
331373 let sender = stream. sender . clone ( ) ;
374+ let counter = std:: sync:: Arc :: clone ( & stream. batches_sent ) ;
332375 streams. insert ( table_name. to_string ( ) , stream) ;
333- sender
376+ ( sender, counter )
334377 } ;
335378
336379 for sub_batch in sub_batches {
@@ -339,6 +382,7 @@ impl AdbcSink {
339382 "Bulk ingest stream for table '{table_name}' is no longer available"
340383 )
341384 } ) ?;
385+ batches_sent. fetch_add ( 1 , Ordering :: Relaxed ) ;
342386 }
343387
344388 Ok ( ( ) )
@@ -363,8 +407,25 @@ impl AdbcSink {
363407 guard. drain ( ) . collect ( )
364408 } ;
365409
410+ // Close every stream and wait for its ADBC worker to finish. Continue
411+ // closing remaining streams even if one fails so we don't leave
412+ // detached workers with open connections.
413+ let mut first_err: Option < anyhow:: Error > = None ;
366414 for ( table_name, stream) in streams {
367- stream. close_and_wait ( & table_name) . await ?;
415+ if let Err ( e) = stream. close_and_wait ( & table_name) . await {
416+ tracing:: error!(
417+ table = %table_name,
418+ error = %e,
419+ "Failed to close bulk ingest stream during flush"
420+ ) ;
421+ if first_err. is_none ( ) {
422+ first_err = Some ( e) ;
423+ }
424+ }
425+ }
426+
427+ if let Some ( e) = first_err {
428+ return Err ( e) ;
368429 }
369430
370431 Ok ( ( ) )
@@ -1245,11 +1306,21 @@ impl Sink for AdbcSink {
12451306 let now = chrono:: Utc :: now ( ) . format ( "%Y-%m-%d %H:%M:%S%.3f UTC" ) ;
12461307 tracing:: debug!( "[adbc] {now} | {table_name} | {op_label} | rows: {rows_current}" ) ;
12471308
1248- if self . reuse_bulk_ingest_streams
1249- && matches ! ( & op, InsertOp :: Delete { .. } | InsertOp :: Update { .. } )
1250- {
1251- // Ensure all queued bulk ingest data is flushed before mutation SQL/update flows.
1252- self . end_all_bulk_ingest_streams ( ) . await ?;
1309+ if self . reuse_bulk_ingest_streams {
1310+ let should_flush = match & op {
1311+ InsertOp :: Delete { .. } => true ,
1312+ InsertOp :: Update { .. } => {
1313+ // BulkIngestUpsert sends upsert data through bulk ingest;
1314+ // flush only when flush_stream_before_upsert is set to
1315+ // avoid mixing upsert and later insert rows in one stream.
1316+ self . update_strategy != UpdateStrategy :: BulkIngestUpsert
1317+ || self . flush_stream_before_upsert
1318+ }
1319+ _ => false ,
1320+ } ;
1321+ if should_flush {
1322+ self . end_bulk_ingest_stream_for_table ( table_name) . await ?;
1323+ }
12531324 }
12541325
12551326 match op {
@@ -1304,10 +1375,15 @@ impl Sink for AdbcSink {
13041375 self . staging_merge_update ( & mut conn, table_name, batch, & key_columns) ?;
13051376 }
13061377 UpdateStrategy :: BulkIngestUpsert => {
1307- if self . reuse_bulk_ingest_streams {
1378+ if self . reuse_bulk_ingest_streams && ! self . flush_stream_before_upsert {
13081379 self . send_batch_via_reused_bulk_ingest_stream ( table_name, batch)
13091380 . await ?;
13101381 } else {
1382+ // Use a direct, independent ingest call so the
1383+ // upsert batch is committed on its own and not
1384+ // mixed with subsequent INSERT data in the same
1385+ // bulk_ingest_stream call (which can cause the
1386+ // target to insert duplicate rows).
13111387 let mut conn = self . pool . get ( ) . map_err ( |e| {
13121388 anyhow:: anyhow!( "Failed to get ADBC connection from pool: {e}" )
13131389 } ) ?;
0 commit comments