Skip to content

Commit 86cd13c

Browse files
feat: preliminary support odbc (datafusion-contrib#177)
* feat: preliminary support odbc * feat: add sqlite odbc example * chore: clippy lint --------- Co-authored-by: Phillip LeBlanc <phillip@leblanc.tech>
1 parent 72e8e58 commit 86cd13c

14 files changed

Lines changed: 611 additions & 29 deletions

File tree

Cargo.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ serde_json = "1.0.124"
6262
snafu = "0.8.3"
6363
time = "0.3.36"
6464
tokio = { version = "1.38.0", features = ["macros", "fs"] }
65+
tokio-util = "0.7.12"
6566
tokio-postgres = { version = "0.7.10", features = [
6667
"with-chrono-0_4",
6768
"with-uuid-1",
@@ -87,6 +88,9 @@ dyn-clone = { version = "1.0.17", optional = true }
8788
geo-types = "0.7.13"
8889
fundu = "2.0.1"
8990
dashmap = "6.1.0"
91+
odbc-api = { version = "9.0.0", optional = true }
92+
arrow-odbc = { version = "13.0.0", optional = true }
93+
sha2 = "0.10.8"
9094

9195
[dev-dependencies]
9296
anyhow = "1.0.86"
@@ -135,8 +139,10 @@ flight = [
135139
"dep:serde",
136140
"dep:tonic",
137141
]
142+
odbc = ["dep:odbc-api", "dep:arrow-odbc", "dep:async-stream", "dep:dyn-clone"]
138143
federation = ["dep:datafusion-federation"]
139144
duckdb-federation = ["duckdb", "federation"]
140145
sqlite-federation = ["sqlite", "federation"]
141146
postgres-federation = ["postgres", "federation"]
142147
mysql-federation = ["mysql", "federation"]
148+
odbc-federation = ["odbc", "federation"]

examples/odbc_sqlite.rs

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
use std::{collections::HashMap, sync::Arc};
2+
3+
use datafusion::prelude::SessionContext;
4+
use datafusion::sql::TableReference;
5+
use datafusion_table_providers::{
6+
odbc::ODBCTableFactory, sql::db_connection_pool::odbcpool::ODBCPool,
7+
util::secrets::to_secret_map,
8+
};
9+
10+
/// This example demonstrates how to:
11+
/// 1. Create a SQLite ODBC connection pool
12+
/// 2. Create and use ODBCTableFactory to generate TableProvider
13+
/// 3. Register TableProvider with DataFusion
14+
/// 4. Use SQL queries to access SQLite ODBC table data
15+
#[tokio::main]
16+
async fn main() {
17+
// Create SQLite ODBC connection pool
18+
let params = to_secret_map(HashMap::from([(
19+
"connection_string".to_owned(),
20+
"driver=SQLite3;database=examples/sqlite_example.db;"
21+
.to_owned(),
22+
)]));
23+
let odbc_pool =
24+
Arc::new(ODBCPool::new(params).expect("unable to create SQLite ODBC connection pool"));
25+
26+
// Create SQLite ODBC table provider factory
27+
// Used to generate TableProvider instances that can read SQLite ODBC table data
28+
let table_factory = ODBCTableFactory::new(odbc_pool.clone());
29+
30+
// Create DataFusion session context
31+
let ctx = SessionContext::new();
32+
33+
// Demonstrate direct table provider registration
34+
// This method registers the table in the default catalog
35+
// Here we register the SQLite ODBC "companies" table as "companies_v2"
36+
ctx.register_table(
37+
"companies_v2",
38+
table_factory
39+
.table_provider(TableReference::bare("companies"), None)
40+
.await
41+
.expect("failed to register table provider"),
42+
)
43+
.expect("failed to register table");
44+
45+
// Query Example 1: Query the renamed table through default catalog
46+
let df = ctx
47+
.sql("SELECT * FROM datafusion.public.companies_v2")
48+
.await
49+
.expect("select failed");
50+
df.show().await.expect("show failed");
51+
}

src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ pub mod duckdb;
88
pub mod flight;
99
#[cfg(feature = "mysql")]
1010
pub mod mysql;
11+
#[cfg(feature = "odbc")]
12+
pub mod odbc;
1113
#[cfg(feature = "postgres")]
1214
pub mod postgres;
1315
#[cfg(feature = "sqlite")]

src/odbc.rs

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
Copyright 2024 The Spice.ai OSS Authors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
https://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
use crate::sql::db_connection_pool::dbconnection::odbcconn::ODBCDbConnectionPool;
18+
use crate::sql::{
19+
db_connection_pool as db_connection_pool_datafusion,
20+
sql_provider_datafusion::{Engine, SqlTable},
21+
};
22+
use arrow::datatypes::SchemaRef;
23+
use datafusion::error::DataFusionError;
24+
use datafusion::{datasource::TableProvider, sql::TableReference};
25+
use snafu::prelude::*;
26+
use std::sync::Arc;
27+
28+
#[derive(Debug, Snafu)]
29+
pub enum Error {
30+
#[snafu(display("DbConnectionError: {source}"))]
31+
DbConnectionError {
32+
source: db_connection_pool_datafusion::dbconnection::GenericError,
33+
},
34+
#[snafu(display("The table '{table_name}' doesn't exist in the Postgres server"))]
35+
TableDoesntExist { table_name: String },
36+
37+
#[snafu(display("Unable to get a DB connection from the pool: {source}"))]
38+
UnableToGetConnectionFromPool {
39+
source: db_connection_pool_datafusion::Error,
40+
},
41+
42+
#[snafu(display("Unable to get schema: {source}"))]
43+
UnableToGetSchema {
44+
source: db_connection_pool_datafusion::dbconnection::Error,
45+
},
46+
47+
#[snafu(display("Unable to generate SQL: {source}"))]
48+
UnableToGenerateSQL { source: DataFusionError },
49+
}
50+
51+
type Result<T, E = Error> = std::result::Result<T, E>;
52+
53+
pub struct ODBCTableFactory<'a> {
54+
pool: Arc<ODBCDbConnectionPool<'a>>,
55+
}
56+
57+
impl<'a> ODBCTableFactory<'a>
58+
where
59+
'a: 'static,
60+
{
61+
#[must_use]
62+
pub fn new(pool: Arc<ODBCDbConnectionPool<'a>>) -> Self {
63+
Self { pool }
64+
}
65+
66+
pub async fn table_provider(
67+
&self,
68+
table_reference: TableReference,
69+
_schema: Option<SchemaRef>,
70+
) -> Result<Arc<dyn TableProvider + 'static>, Box<dyn std::error::Error + Send + Sync>> {
71+
let pool = Arc::clone(&self.pool);
72+
let dyn_pool: Arc<ODBCDbConnectionPool<'a>> = pool;
73+
74+
let table = SqlTable::new("odbc", &dyn_pool, table_reference, Some(Engine::ODBC))
75+
.await
76+
.map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
77+
78+
let table_provider = Arc::new(table);
79+
80+
#[cfg(feature = "odbc-federation")]
81+
let table_provider = Arc::new(
82+
table_provider
83+
.create_federated_table_provider()
84+
.map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?,
85+
);
86+
87+
Ok(table_provider)
88+
}
89+
}

src/sql/arrow_sql_gen/postgres/composite.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ fn composite_type_fields(type_: &Type) -> &[Field] {
8080
}
8181
}
8282

83-
impl<'a> CompositeType<'a> {
83+
impl CompositeType<'_> {
8484
/// Returns information about the fields of the composite type.
8585
#[must_use]
8686
pub fn fields(&self) -> &[Field] {
@@ -180,7 +180,7 @@ impl<'a> CompositeTypeRanges<'a> {
180180
}
181181

182182
#[allow(clippy::cast_sign_loss)]
183-
impl<'a> FallibleIterator for CompositeTypeRanges<'a> {
183+
impl FallibleIterator for CompositeTypeRanges<'_> {
184184
type Item = Option<std::ops::Range<usize>>;
185185
type Error = std::io::Error;
186186

src/sql/db_connection_pool/dbconnection.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ use snafu::prelude::*;
99
pub mod duckdbconn;
1010
#[cfg(feature = "mysql")]
1111
pub mod mysqlconn;
12+
#[cfg(feature = "odbc")]
13+
pub mod odbcconn;
1214
#[cfg(feature = "postgres")]
1315
pub mod postgresconn;
1416
#[cfg(feature = "sqlite")]

0 commit comments

Comments
 (0)