Skip to content

Commit 54d4b89

Browse files
authored
fix: Properly set ADBC catalog paths when specified (#141)
* fix: Properly set ADBC catalog paths when specified * fix build
1 parent b8b1ed8 commit 54d4b89

5 files changed

Lines changed: 106 additions & 11 deletions

File tree

crates/adbc_client/src/lib.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,12 +145,14 @@ impl AdbcConnection {
145145
pub fn bulk_ingest(
146146
&mut self,
147147
target_table: &str,
148+
target_db_catalog: Option<&str>,
148149
target_db_schema: Option<&str>,
149150
mode: options::IngestMode,
150151
batch: RecordBatch,
151152
) -> Result<Option<i64>> {
152153
self.bulk_ingest_stream(
153154
target_table,
155+
target_db_catalog,
154156
target_db_schema,
155157
mode,
156158
Box::new(arrow_array::RecordBatchIterator::new(
@@ -168,6 +170,7 @@ impl AdbcConnection {
168170
pub fn bulk_ingest_stream(
169171
&mut self,
170172
target_table: &str,
173+
target_db_catalog: Option<&str>,
171174
target_db_schema: Option<&str>,
172175
mode: options::IngestMode,
173176
reader: Box<dyn arrow_array::RecordBatchReader + Send>,
@@ -184,6 +187,16 @@ impl AdbcConnection {
184187
reason: format!("Failed to set target table: {e}"),
185188
})?;
186189

190+
if let Some(catalog) = target_db_catalog {
191+
stmt.set_option(
192+
options::OptionStatement::TargetCatalog,
193+
OptionValue::from(catalog),
194+
)
195+
.map_err(|e| Error::ExecuteQuery {
196+
reason: format!("Failed to set target db catalog: {e}"),
197+
})?;
198+
}
199+
187200
if let Some(schema) = target_db_schema {
188201
stmt.set_option(
189202
options::OptionStatement::TargetDbSchema,
@@ -204,7 +217,9 @@ impl AdbcConnection {
204217
})?;
205218

206219
stmt.execute_update().map_err(|e| Error::ExecuteQuery {
207-
reason: format!("Bulk ingest execution failed: {e}"),
220+
reason: format!(
221+
"Bulk ingest execution failed for target_table='{target_table}', target_db_catalog={target_db_catalog:?}, target_db_schema={target_db_schema:?}: {e}"
222+
),
208223
})
209224
}
210225
}

crates/etl/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ const INTERNAL_COLUMNS: &[&str] = &["_op", "_op_index"];
5252
/// Smaller input batches from a [`ReadResult`] are concatenated together until
5353
/// this threshold is reached, and larger input batches are split so no output
5454
/// batch exceeds this size.
55-
const TARGET_BATCH_ROWS: usize = 8_192 * 2;
55+
const TARGET_BATCH_ROWS: usize = 8_192 * 4;
5656

5757
/// Maximum number of in-flight sink writes allowed per table task when the
5858
/// current segment set is insert-only.

crates/etl/src/main.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,10 @@ struct Cli {
7979
#[arg(long)]
8080
adbc_uri: Option<String>,
8181

82+
/// Optional target database catalog for ADBC bulk ingest inserts
83+
#[arg(long)]
84+
adbc_catalog: Option<String>,
85+
8286
/// Optional target database schema for bulk ingest
8387
#[arg(long)]
8488
adbc_schema: Option<String>,
@@ -173,7 +177,12 @@ async fn main() -> anyhow::Result<()> {
173177
});
174178
}
175179

176-
let adbc_sink = Arc::new(AdbcSink::new(driver, db_kwargs, cli.adbc_schema.clone())?);
180+
let adbc_sink = Arc::new(AdbcSink::new(
181+
driver,
182+
db_kwargs,
183+
cli.adbc_catalog.clone(),
184+
cli.adbc_schema.clone(),
185+
)?);
177186

178187
(
179188
adbc_sink.clone() as Arc<dyn Sink>,
@@ -242,6 +251,7 @@ async fn main() -> anyhow::Result<()> {
242251
prefix = %cli.prefix,
243252
target = %target_kind,
244253
adbc_driver = ?cli.adbc_driver,
254+
adbc_catalog = ?cli.adbc_catalog,
245255
adbc_schema = ?cli.adbc_schema,
246256
adbc_create_tables = cli.adbc_create_tables,
247257
target_prefix = %cli.target_prefix,

crates/etl/src/sink/adbc.rs

Lines changed: 55 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ const MAX_ADBC_INGEST_BATCH_BYTES_ENV: &str = "SPICEBENCH_ADBC_MAX_INGEST_BATCH_
3737
/// SQL statements derived from key columns in each batch.
3838
pub struct AdbcSink {
3939
conn: Mutex<AdbcConnection>,
40+
target_db_catalog: Option<String>,
4041
target_db_schema: Option<String>,
4142
}
4243

@@ -53,13 +54,15 @@ impl AdbcSink {
5354
pub fn new(
5455
driver_name: &str,
5556
db_kwargs: HashMap<String, serde_json::Value>,
57+
target_db_catalog: Option<String>,
5658
target_db_schema: Option<String>,
5759
) -> anyhow::Result<Self> {
5860
let conn = AdbcConnection::create(driver_name, db_kwargs)
5961
.map_err(|e| anyhow::anyhow!("Failed to create ADBC connection: {e}"))?;
6062

6163
Ok(Self {
6264
conn: Mutex::new(conn),
65+
target_db_catalog,
6366
target_db_schema,
6467
})
6568
}
@@ -106,13 +109,45 @@ impl AdbcSink {
106109
}
107110

108111
fn target_table_identifier(&self, table_name: &str) -> String {
109-
let table_ident = Self::quote_identifier(table_name);
112+
let mut parts = Vec::with_capacity(3);
113+
114+
if let Some(catalog) = self.target_db_catalog.as_deref() {
115+
if !catalog.is_empty() {
116+
parts.push(Self::quote_identifier(catalog));
117+
}
118+
}
119+
120+
if let Some(schema) = self.target_db_schema.as_deref() {
121+
if !schema.is_empty() {
122+
parts.push(Self::quote_identifier(schema));
123+
}
124+
}
125+
126+
parts.push(Self::quote_identifier(table_name));
127+
parts.join(".")
128+
}
129+
130+
fn target_table_ingest_name(&self, table_name: &str) -> String {
131+
self.target_table_identifier_unquoted(table_name)
132+
}
133+
134+
fn target_table_identifier_unquoted(&self, table_name: &str) -> String {
135+
let mut parts = Vec::with_capacity(3);
136+
137+
if let Some(catalog) = self.target_db_catalog.as_deref() {
138+
if !catalog.is_empty() {
139+
parts.push(catalog.to_string());
140+
}
141+
}
142+
110143
if let Some(schema) = self.target_db_schema.as_deref() {
111144
if !schema.is_empty() {
112-
return format!("{}.{}", Self::quote_identifier(schema), table_ident);
145+
parts.push(schema.to_string());
113146
}
114147
}
115-
table_ident
148+
149+
parts.push(table_name.to_string());
150+
parts.join(".")
116151
}
117152

118153
fn create_table_sql(&self, table_name: &str, schema: &Schema) -> anyhow::Result<String> {
@@ -129,7 +164,7 @@ impl AdbcSink {
129164
.join(", ");
130165

131166
Ok(format!(
132-
"CREATE TABLE IF NOT EXISTS spicebench.bench.{} ({columns})",
167+
"CREATE TABLE IF NOT EXISTS {} ({columns})",
133168
self.target_table_identifier(table_name)
134169
))
135170
}
@@ -222,9 +257,20 @@ impl AdbcSink {
222257
table_name: &str,
223258
batch: RecordBatch,
224259
) -> anyhow::Result<()> {
260+
let ingest_table_name = self.target_table_ingest_name(table_name);
261+
let target_db_catalog = self
262+
.target_db_catalog
263+
.as_deref()
264+
.and_then(|catalog| (!catalog.is_empty()).then_some(catalog));
265+
let target_db_schema = self
266+
.target_db_schema
267+
.as_deref()
268+
.and_then(|schema| (!schema.is_empty()).then_some(schema));
269+
225270
match conn.bulk_ingest(
226271
table_name,
227-
self.target_db_schema.as_deref(),
272+
target_db_catalog,
273+
target_db_schema,
228274
IngestMode::CreateAppend,
229275
batch.clone(),
230276
) {
@@ -234,7 +280,7 @@ impl AdbcSink {
234280
if Self::is_message_too_large_error(&message) {
235281
if batch.num_rows() <= 1 {
236282
anyhow::bail!(
237-
"ADBC bulk ingest failed for '{table_name}': single-row batch still exceeds FlightSQL message limit: {message}. Configure a larger FlightSQL max message size (e.g. adbc.flight.sql.client_option.with_max_msg_size)."
283+
"ADBC bulk ingest failed for source table '{table_name}' (ingest target '{ingest_table_name}'): single-row batch still exceeds FlightSQL message limit: {message}. Configure a larger FlightSQL max message size (e.g. adbc.flight.sql.client_option.with_max_msg_size)."
238284
);
239285
}
240286

@@ -245,7 +291,9 @@ impl AdbcSink {
245291
self.bulk_ingest_with_retry(conn, table_name, right)?;
246292
Ok(())
247293
} else {
248-
anyhow::bail!("ADBC bulk ingest failed for '{table_name}': {message}")
294+
anyhow::bail!(
295+
"ADBC bulk ingest failed for source table '{table_name}' (ingest target '{ingest_table_name}'): {message}"
296+
)
249297
}
250298
}
251299
}

src/main.rs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,20 @@ fn s3_hive_target_prefix(common: &CommonArgs, scenario_name: &str, run_id: uuid:
6464
format!("{target_prefix}/{scenario_name}/{run_id}")
6565
}
6666

67+
fn infer_adbc_target_namespace(
68+
catalog_namespace: Option<&str>,
69+
) -> (Option<String>, Option<String>) {
70+
let Some(namespace) = catalog_namespace.map(str::trim).filter(|ns| !ns.is_empty()) else {
71+
return (None, None);
72+
};
73+
74+
let mut parts = namespace.rsplitn(2, '.');
75+
let schema = parts.next().map(str::trim).filter(|v| !v.is_empty());
76+
let catalog = parts.next().map(str::trim).filter(|v| !v.is_empty());
77+
78+
(catalog.map(str::to_string), schema.map(str::to_string))
79+
}
80+
6781
async fn run_benchmark(
6882
common: &CommonArgs,
6983
system_adapter_client: Arc<Mutex<system_adapter_protocol::Client>>,
@@ -203,7 +217,15 @@ async fn run_benchmark(
203217
});
204218
}
205219

206-
let adbc_sink = Arc::new(AdbcSink::new(&driver_name, db_kwargs, None)?);
220+
let (target_db_catalog, target_db_schema) =
221+
infer_adbc_target_namespace(setup_response.catalog_namespace.as_deref());
222+
223+
let adbc_sink = Arc::new(AdbcSink::new(
224+
&driver_name,
225+
db_kwargs,
226+
target_db_catalog,
227+
target_db_schema,
228+
)?);
207229

208230
setup_response_for_run = Some(setup_response);
209231

0 commit comments

Comments
 (0)