@@ -15,9 +15,10 @@ limitations under the License.
1515*/
1616
1717use std:: collections:: { HashMap , HashSet } ;
18+ use std:: sync:: Arc ;
1819use std:: sync:: atomic:: { AtomicU64 , Ordering } ;
1920use std:: time:: Instant ;
20- use tokio:: sync:: { RwLock , mpsc} ;
21+ use tokio:: sync:: { Mutex , RwLock , mpsc} ;
2122use tokio:: task:: JoinHandle ;
2223
2324use adbc_client:: {
@@ -28,7 +29,8 @@ use arrow::datatypes::{DataType, Schema};
2829use arrow:: record_batch:: RecordBatchIterator ;
2930use arrow_cast:: display:: array_value_to_string;
3031use async_trait:: async_trait;
31- use system_adapter_protocol:: DatasetConfig ;
32+ use system_adapter_protocol:: { Client as SystemAdapterClient , DatasetConfig } ;
33+ use uuid:: Uuid ;
3234
3335use super :: { InsertOp , Sink } ;
3436
@@ -174,6 +176,8 @@ pub struct AdbcSink {
174176 reuse_bulk_ingest_streams : bool ,
175177 flush_stream_before_upsert : bool ,
176178 bulk_ingest_stream_buffer : usize ,
179+ /// Optional system adapter client for staging table creation.
180+ staging_adapter : Option < ( Arc < Mutex < SystemAdapterClient > > , Uuid ) > ,
177181}
178182
179183impl AdbcSink {
@@ -230,11 +234,15 @@ impl AdbcSink {
230234 }
231235
232236 /// Creates a new [`AdbcSink`] backed by a connection pool.
237+ ///
238+ /// When a system adapter client and run ID are provided, the `StagingTable`
239+ /// update strategy will use the adapter to create staging tables remotely.
233240 pub fn new (
234241 driver_name : & str ,
235242 db_kwargs : HashMap < String , serde_json:: Value > ,
236243 target_db_catalog : Option < String > ,
237244 target_db_schema : Option < String > ,
245+ staging_adapter : Option < ( Arc < Mutex < SystemAdapterClient > > , Uuid ) > ,
238246 ) -> anyhow:: Result < Self > {
239247 let update_strategy = UpdateStrategy :: from_env ( ) ?;
240248 let pool_size = Self :: pool_size ( ) ;
@@ -269,6 +277,7 @@ impl AdbcSink {
269277 reuse_bulk_ingest_streams,
270278 flush_stream_before_upsert,
271279 bulk_ingest_stream_buffer,
280+ staging_adapter,
272281 } )
273282 }
274283
@@ -1148,10 +1157,11 @@ impl AdbcSink {
11481157
11491158 /// Perform an UPDATE via a temporary staging table:
11501159 ///
1151- /// 1. Bulk-ingest the update batch into a staging table.
1152- /// 2. `MERGE INTO target USING staging ON … WHEN MATCHED THEN UPDATE SET …`
1153- /// 3. `DROP TABLE staging`.
1154- fn staging_merge_update (
1160+ /// 1. Create the staging table via the hook (same schema/partitioning as target).
1161+ /// 2. Bulk-ingest the update batch into the staging table.
1162+ /// 3. `MERGE INTO target USING staging ON … WHEN MATCHED THEN UPDATE SET …`
1163+ /// 4. `DROP TABLE staging` (via hook, then SQL fallback).
1164+ async fn staging_merge_update (
11551165 & self ,
11561166 conn : & mut AdbcConnection ,
11571167 table_name : & str ,
@@ -1182,15 +1192,30 @@ impl AdbcSink {
11821192 . as_millis ( ) ;
11831193 let staging_table = format ! ( "_spicebench_stg_{table_name}_{ts}" ) ;
11841194
1185- // 1. Bulk-ingest batch into the staging table.
1195+ // 1. Create the staging table via the system adapter (if available).
1196+ if let Some ( ( client, run_id) ) = & self . staging_adapter {
1197+ client
1198+ . lock ( )
1199+ . await
1200+ . create_staging_table ( * run_id, table_name, & staging_table)
1201+ . await
1202+ . map_err ( |e| {
1203+ anyhow:: anyhow!(
1204+ "Failed to create staging table '{staging_table}' for '{table_name}': {e}"
1205+ )
1206+ } ) ?;
1207+ }
1208+
1209+ // 2. Bulk-ingest batch into the staging table.
11861210 if let Err ( e) = self . ingest_insert_batch ( conn, & staging_table, batch) {
1187- self . drop_staging_table ( conn, & staging_table) ;
1211+ self . drop_staging_table_best_effort ( conn, & staging_table)
1212+ . await ;
11881213 return Err ( e. context ( format ! (
11891214 "Failed to ingest update data into staging table '{staging_table}'"
11901215 ) ) ) ;
11911216 }
11921217
1193- // 2 . MERGE INTO target from staging.
1218+ // 3 . MERGE INTO target from staging.
11941219 let merge_sql = Self :: build_staging_merge_sql (
11951220 & self . target_table_identifier ( table_name) ,
11961221 & self . target_table_identifier ( & staging_table) ,
@@ -1202,8 +1227,9 @@ impl AdbcSink {
12021227 . execute_update ( & merge_sql)
12031228 . map_err ( |e| anyhow:: anyhow!( "MERGE INTO update failed for '{table_name}': {e}" ) ) ;
12041229
1205- // 3. Drop staging table (always, even on merge failure).
1206- self . drop_staging_table ( conn, & staging_table) ;
1230+ // 4. Drop staging table (always, even on merge failure).
1231+ self . drop_staging_table_best_effort ( conn, & staging_table)
1232+ . await ;
12071233
12081234 merge_result?;
12091235 tracing:: debug!(
@@ -1215,8 +1241,8 @@ impl AdbcSink {
12151241 Ok ( ( ) )
12161242 }
12171243
1218- /// Best-effort drop of a staging table.
1219- fn drop_staging_table ( & self , conn : & mut AdbcConnection , staging_table : & str ) {
1244+ /// Best-effort drop of a staging table via SQL DROP TABLE .
1245+ async fn drop_staging_table_best_effort ( & self , conn : & mut AdbcConnection , staging_table : & str ) {
12201246 let drop_sql = format ! (
12211247 "DROP TABLE IF EXISTS {}" ,
12221248 self . target_table_identifier( staging_table)
@@ -1372,7 +1398,8 @@ impl Sink for AdbcSink {
13721398 let mut conn = self . pool . get ( ) . map_err ( |e| {
13731399 anyhow:: anyhow!( "Failed to get ADBC connection from pool: {e}" )
13741400 } ) ?;
1375- self . staging_merge_update ( & mut conn, table_name, batch, & key_columns) ?;
1401+ self . staging_merge_update ( & mut conn, table_name, batch, & key_columns)
1402+ . await ?;
13761403 }
13771404 UpdateStrategy :: BulkIngestUpsert => {
13781405 if self . reuse_bulk_ingest_streams && !self . flush_stream_before_upsert {
0 commit comments