Skip to content

Commit 9500602

Browse files
lukekimkrinart
andauthored
Fix ADBC Connector build and test (spiceai#9813)
* Improve JSON SODA support * Fix calendar_date to Timestamp(Second), fix clippy match_same_arms, rustfmt * Propagate soda_metadata in file_source() * Remove unused dependency 'arrow-schema' and update json_format description for clarity * fix: include JSON formats in HTTPS is_structured_format check Add json, jsonl, ndjson, ldjson, soda, and socrata to the explicit file_format check in is_structured_format(). Also add json, jsonl, ndjson, ldjson to the URL extension auto-detect check. This ensures that when file_format is set to a JSON-family format on an HTTPS dataset, it correctly routes to ListingTableConnector (which handles json_format, json_pointer, soda_metadata, etc.) instead of falling through to HttpTableProvider. * feat: Add rusqlite dependency and create SQLite tables for ADBC connector tests * fix: pre-create ADBC SQLite tables before load_components The ADBC connector calls GetTableSchema during load_components(), which requires tables to already exist. Previously the tests created tables after load_components(), causing 'no such table' errors and 60s timeouts. - Add rusqlite dev-dependency for direct SQLite table creation - Use tempfile::TempDir for automatic cleanup (even on panic) - Make setup_sqlite_table idempotent with DROP TABLE IF EXISTS - Pre-create tables with schema and seed data before runtime starts * feat: Add driver options, catalog, and schema parameters to ADBC connector * fix: Ensure ADBC driver options are prefixed with 'adbc.' in configuration * fix: Remove redundant note about read-only connection option in README * fix: Ignore ADBC options with empty keys and update SQLite table setup function * fix: Improve ADBC option logging and update SQLite test for prepopulated data * Fix dialect * Lint --------- Co-authored-by: Viktor Yershov <viktor@spice.ai>
1 parent 708465d commit 9500602

7 files changed

Lines changed: 299 additions & 76 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -390,7 +390,7 @@ datafusion-physical-optimizer = { git = "https://github.com/spiceai/datafusion.g
390390
datafusion-spark = { git = "https://github.com/spiceai/datafusion.git", rev = "7bbfa5a179f45173bb2db5df00c0d80619d2eb30" } # spiceai-52
391391
datafusion-substrait = { git = "https://github.com/spiceai/datafusion.git", rev = "7bbfa5a179f45173bb2db5df00c0d80619d2eb30" } # spiceai-52
392392

393-
datafusion-table-providers = { git = "https://github.com/datafusion-contrib/datafusion-table-providers.git", rev = "9748420cbbd92d40369bce45d9e3227ca7cd261a" } # spiceai-52
393+
datafusion-table-providers = { git = "https://github.com/datafusion-contrib/datafusion-table-providers.git", rev = "de767a2a9eb3f48be5e917fce1f864d7588ec6ed" } # spiceai-52
394394

395395
ballista-core = { git = "https://github.com/spiceai/datafusion-ballista.git", rev = "ad88031f002aec4ec63dc1fabd81dfd3cf69fb22" } # spiceai-52
396396
ballista-executor = { git = "https://github.com/spiceai/datafusion-ballista.git", rev = "ad88031f002aec4ec63dc1fabd81dfd3cf69fb22" } # spiceai-52

bin/spiced/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ cuda = ["runtime/cuda"]
7777
databricks = ["connector-databricks", "runtime/databricks"]
7878
debezium = ["runtime/debezium"]
7979
default = [
80+
"adbc",
8081
"duckdb",
8182
"postgres",
8283
"sqlite",

crates/runtime/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,7 @@ tempfile.workspace = true
247247
test-framework = { path = "../test-framework" }
248248
tiberius.workspace = true
249249
tokio = { workspace = true, features = ["time", "test-util"] }
250+
rusqlite = { workspace = true }
250251
tracing-opentelemetry.workspace = true
251252
tracing-subscriber.workspace = true
252253
yaml = { path = "../yaml" }

crates/runtime/src/dataconnector/adbc.rs

Lines changed: 204 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,14 @@ use adbc_core::{Driver as _, LOAD_FLAG_DEFAULT};
2020
use adbc_driver_manager::ManagedDriver;
2121
use async_trait::async_trait;
2222
use datafusion::datasource::TableProvider;
23+
use datafusion::sql::unparser::dialect::{BigQueryDialect, Dialect};
2324
use datafusion_table_providers::adbc::AdbcTableFactory;
2425
use datafusion_table_providers::sql::db_connection_pool::adbcpool::{
2526
ADBCPool, AdbcConnectionPoolBuilder,
2627
};
2728
use snafu::prelude::*;
2829
use std::any::Any;
30+
use std::collections::HashMap;
2931
use std::future::Future;
3032
use std::pin::Pin;
3133
use std::sync::Arc;
@@ -77,6 +79,7 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
7779

7880
pub struct Adbc {
7981
adbc_factory: AdbcTableFactory<adbc_driver_manager::ManagedDatabase>,
82+
driver_name: String,
8083
}
8184

8285
impl std::fmt::Debug for Adbc {
@@ -114,6 +117,11 @@ const PARAMETERS: &[ParameterSpec] = &[
114117
ParameterSpec::component("password")
115118
.description("Password for database authentication")
116119
.secret(),
120+
ParameterSpec::component("driver_options").description(
121+
"Semicolon-delimited driver-specific database options (e.g., 'key1=value1;key2=value2')",
122+
),
123+
ParameterSpec::component("catalog").description("The catalog for the connection"),
124+
ParameterSpec::component("schema").description("The schema for the connection"),
117125
ParameterSpec::runtime("connection_pool_size")
118126
.description("The maximum number of connections in the connection pool.")
119127
.default("5"),
@@ -144,6 +152,7 @@ impl DataConnectorFactory for AdbcFactory {
144152
source: Box::new(e),
145153
})?;
146154

155+
let driver_name_owned = driver_name.to_string();
147156
let driver_path = params.parameters.get("driver_path").expose().ok();
148157
let driver_location = driver_path.unwrap_or(driver_name).to_string();
149158

@@ -163,7 +172,23 @@ impl DataConnectorFactory for AdbcFactory {
163172

164173
let username = params.parameters.get("username").expose().ok();
165174
let password = params.parameters.get("password").expose().ok();
166-
let db_options = build_db_options(&uri_str, username, password);
175+
let driver_options = params.parameters.get("driver_options").expose().ok();
176+
let db_options = build_db_options(&uri_str, username, password, driver_options);
177+
178+
let catalog = params
179+
.parameters
180+
.get("catalog")
181+
.expose()
182+
.ok()
183+
.map(String::from);
184+
let schema = params
185+
.parameters
186+
.get("schema")
187+
.expose()
188+
.ok()
189+
.map(String::from);
190+
191+
let conn_options = build_conn_options(catalog.as_deref(), schema.as_deref());
167192

168193
let parse_pool_param = |name: &str| -> std::result::Result<Option<u32>, Error> {
169194
match params.parameters.get(name).expose().ok() {
@@ -232,9 +257,15 @@ impl DataConnectorFactory for AdbcFactory {
232257
},
233258
)?;
234259

235-
let pool = AdbcConnectionPoolBuilder::new(db)
260+
let mut pool_builder = AdbcConnectionPoolBuilder::new(db)
236261
.with_max_size(pool_size)
237-
.with_min_idle(pool_min_idle)
262+
.with_min_idle(pool_min_idle);
263+
264+
if let Some(conn_opts) = conn_options {
265+
pool_builder = pool_builder.with_conn_options(conn_opts);
266+
}
267+
268+
let pool = pool_builder
238269
.build()
239270
.context(UnableToCreateConnectionPoolSnafu {
240271
driver_location,
@@ -257,7 +288,10 @@ impl DataConnectorFactory for AdbcFactory {
257288

258289
let adbc_factory = AdbcTableFactory::new(pool);
259290

260-
Ok(Arc::new(Adbc { adbc_factory }) as Arc<dyn DataConnector>)
291+
Ok(Arc::new(Adbc {
292+
adbc_factory,
293+
driver_name: driver_name_owned,
294+
}) as Arc<dyn DataConnector>)
261295
})
262296
}
263297

@@ -275,6 +309,7 @@ fn build_db_options(
275309
uri: &str,
276310
username: Option<&str>,
277311
password: Option<&str>,
312+
driver_options: Option<&str>,
278313
) -> Vec<(OptionDatabase, adbc_core::options::OptionValue)> {
279314
let mut opts = vec![(OptionDatabase::Uri, uri.into())];
280315
if let Some(u) = username {
@@ -283,9 +318,66 @@ fn build_db_options(
283318
if let Some(p) = password {
284319
opts.push((OptionDatabase::Password, p.into()));
285320
}
321+
if let Some(options_str) = driver_options {
322+
for pair in options_str.split(';') {
323+
let pair = pair.trim();
324+
if pair.is_empty() {
325+
continue;
326+
}
327+
if let Some((key, value)) = pair.split_once('=') {
328+
let key = key.trim();
329+
if key.is_empty() {
330+
tracing::warn!("Ignoring ADBC driver option with empty key");
331+
continue;
332+
}
333+
let key = if key.starts_with("adbc.") {
334+
key.to_string()
335+
} else {
336+
format!("adbc.{key}")
337+
};
338+
opts.push((OptionDatabase::Other(key), value.trim().into()));
339+
} else {
340+
tracing::warn!("Ignoring malformed ADBC driver option (expected 'key=value')");
341+
}
342+
}
343+
}
286344
opts
287345
}
288346

347+
/// Builds connection-level options from connector parameters.
348+
fn build_conn_options(
349+
catalog: Option<&str>,
350+
schema: Option<&str>,
351+
) -> Option<HashMap<String, String>> {
352+
let mut opts = HashMap::new();
353+
354+
if let Some(catalog) = catalog {
355+
opts.insert(
356+
adbc_core::options::OptionConnection::CurrentCatalog
357+
.as_ref()
358+
.to_string(),
359+
catalog.to_string(),
360+
);
361+
}
362+
363+
if let Some(schema) = schema {
364+
opts.insert(
365+
adbc_core::options::OptionConnection::CurrentSchema
366+
.as_ref()
367+
.to_string(),
368+
schema.to_string(),
369+
);
370+
}
371+
372+
if opts.is_empty() { None } else { Some(opts) }
373+
}
374+
fn dialect_for_driver(driver_name: &str) -> Option<Arc<dyn Dialect + Send + Sync>> {
375+
match driver_name {
376+
"bigquery" => Some(Arc::new(BigQueryDialect::new())),
377+
_ => None,
378+
}
379+
}
380+
289381
register_data_connector!("adbc", AdbcFactory);
290382

291383
#[async_trait]
@@ -299,9 +391,9 @@ impl DataConnector for Adbc {
299391
dataset: &Dataset,
300392
) -> super::DataConnectorResult<Arc<dyn TableProvider>> {
301393
let table_reference = dataset.path().into();
302-
394+
let dialect = dialect_for_driver(&self.driver_name);
303395
self.adbc_factory
304-
.table_provider(table_reference, None)
396+
.table_provider(table_reference, dialect)
305397
.await
306398
.map_err(|e| DataConnectorError::UnableToGetReadProvider {
307399
dataconnector: "adbc".to_string(),
@@ -315,10 +407,11 @@ impl DataConnector for Adbc {
315407
dataset: &Dataset,
316408
) -> Option<super::DataConnectorResult<Arc<dyn TableProvider>>> {
317409
let table_reference = dataset.path().into();
410+
let dialect = dialect_for_driver(&self.driver_name);
318411

319412
Some(
320413
self.adbc_factory
321-
.read_write_table_provider(table_reference, None)
414+
.read_write_table_provider(table_reference, dialect)
322415
.await
323416
.map_err(|e| DataConnectorError::UnableToGetReadWriteProvider {
324417
dataconnector: "adbc".to_string(),
@@ -356,6 +449,9 @@ mod tests {
356449
assert!(param_names.contains(&"uri"));
357450
assert!(param_names.contains(&"username"));
358451
assert!(param_names.contains(&"password"));
452+
assert!(param_names.contains(&"driver_options"));
453+
assert!(param_names.contains(&"catalog"));
454+
assert!(param_names.contains(&"schema"));
359455
assert!(param_names.contains(&"connection_pool_size"));
360456
assert!(param_names.contains(&"connection_pool_min_idle"));
361457
}
@@ -383,7 +479,7 @@ mod tests {
383479

384480
#[test]
385481
fn test_build_db_options_uri_only() {
386-
let opts = build_db_options("file:test.db", None, None);
482+
let opts = build_db_options("file:test.db", None, None, None);
387483
assert_eq!(opts.len(), 1);
388484
assert_eq!(opts[0].0, OptionDatabase::Uri);
389485
assert!(
@@ -393,7 +489,7 @@ mod tests {
393489

394490
#[test]
395491
fn test_build_db_options_with_username_password() {
396-
let opts = build_db_options("postgres://host/db", Some("admin"), Some("secret"));
492+
let opts = build_db_options("postgres://host/db", Some("admin"), Some("secret"), None);
397493
assert_eq!(opts.len(), 3);
398494

399495
assert_eq!(opts[0].0, OptionDatabase::Uri);
@@ -410,10 +506,108 @@ mod tests {
410506

411507
#[test]
412508
fn test_build_db_options_username_only() {
413-
let opts = build_db_options("sqlite:test.db", Some("user"), None);
509+
let opts = build_db_options("sqlite:test.db", Some("user"), None, None);
414510
assert_eq!(opts.len(), 2);
415511
assert_eq!(opts[0].0, OptionDatabase::Uri);
416512
assert_eq!(opts[1].0, OptionDatabase::Username);
417513
assert!(matches!(&opts[1].1, adbc_core::options::OptionValue::String(s) if s == "user"));
418514
}
515+
516+
#[test]
517+
fn test_build_db_options_with_driver_options_unprefixed() {
518+
let opts = build_db_options(
519+
"uri://db",
520+
None,
521+
None,
522+
Some("snowflake.sql.db=MY_DB;snowflake.sql.schema=PUBLIC"),
523+
);
524+
assert_eq!(opts.len(), 3);
525+
assert_eq!(opts[0].0, OptionDatabase::Uri);
526+
assert_eq!(
527+
opts[1].0,
528+
OptionDatabase::Other("adbc.snowflake.sql.db".to_string())
529+
);
530+
assert!(matches!(&opts[1].1, adbc_core::options::OptionValue::String(s) if s == "MY_DB"));
531+
assert_eq!(
532+
opts[2].0,
533+
OptionDatabase::Other("adbc.snowflake.sql.schema".to_string())
534+
);
535+
assert!(matches!(&opts[2].1, adbc_core::options::OptionValue::String(s) if s == "PUBLIC"));
536+
}
537+
538+
#[test]
539+
fn test_build_db_options_with_driver_options_prefixed() {
540+
let opts = build_db_options(
541+
"uri://db",
542+
None,
543+
None,
544+
Some("adbc.snowflake.sql.db=MY_DB;adbc.snowflake.sql.schema=PUBLIC"),
545+
);
546+
assert_eq!(opts.len(), 3);
547+
assert_eq!(
548+
opts[1].0,
549+
OptionDatabase::Other("adbc.snowflake.sql.db".to_string())
550+
);
551+
assert_eq!(
552+
opts[2].0,
553+
OptionDatabase::Other("adbc.snowflake.sql.schema".to_string())
554+
);
555+
}
556+
557+
#[test]
558+
fn test_build_db_options_driver_options_trailing_semicolon() {
559+
let opts = build_db_options("uri://db", None, None, Some("key=value;"));
560+
assert_eq!(opts.len(), 2);
561+
assert_eq!(opts[1].0, OptionDatabase::Other("adbc.key".to_string()));
562+
assert!(matches!(&opts[1].1, adbc_core::options::OptionValue::String(s) if s == "value"));
563+
}
564+
565+
#[test]
566+
fn test_build_db_options_driver_options_malformed_ignored() {
567+
let opts = build_db_options(
568+
"uri://db",
569+
None,
570+
None,
571+
Some("good=val;bad_no_equals;another=ok"),
572+
);
573+
assert_eq!(opts.len(), 3); // uri + good + another (bad_no_equals skipped)
574+
}
575+
576+
#[test]
577+
fn test_build_db_options_driver_options_empty_key_ignored() {
578+
let opts = build_db_options("uri://db", None, None, Some("=value;good=ok"));
579+
assert_eq!(opts.len(), 2); // uri + good (empty key skipped)
580+
assert_eq!(opts[1].0, OptionDatabase::Other("adbc.good".to_string()));
581+
}
582+
583+
#[test]
584+
fn test_build_conn_options_none_when_empty() {
585+
let opts = build_conn_options(None, None);
586+
assert!(opts.is_none());
587+
}
588+
589+
#[test]
590+
fn test_build_conn_options_both() {
591+
let opts =
592+
build_conn_options(Some("my_catalog"), Some("my_schema")).expect("should have options");
593+
assert_eq!(opts.len(), 2);
594+
assert_eq!(
595+
opts.get("adbc.connection.catalog"),
596+
Some(&"my_catalog".to_string())
597+
);
598+
assert_eq!(
599+
opts.get("adbc.connection.db_schema"),
600+
Some(&"my_schema".to_string())
601+
);
602+
}
603+
604+
#[test]
605+
fn test_build_conn_options_catalog_only() {
606+
let opts = build_conn_options(Some("cat"), None).expect("should have options");
607+
assert_eq!(opts.len(), 1);
608+
assert_eq!(
609+
opts.get("adbc.connection.catalog"),
610+
Some(&"cat".to_string())
611+
);
612+
}
419613
}

0 commit comments

Comments
 (0)