Skip to content

Commit 2c1b888

Browse files
committed
feat: Add Oracle table provider with rust-oracle driver
Implement Oracle support using rust-oracle (ODPI-C) and bb8 connection pooling. Includes comprehensive type mappings (NUMBER, DATE, TIMESTAMP, CLOB, BLOB, RAW), schema inference, and 12 integration tests. Tested against Oracle Database 23c Free. Note: This is my first contribution to the project. Feedback welcome! :)
1 parent 8b277b5 commit 2c1b888

15 files changed

Lines changed: 1917 additions & 0 deletions

File tree

README.md

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ let ctx = SessionContext::with_state(state);
2222

2323
- PostgreSQL
2424
- MySQL
25+
- Oracle
2526
- SQLite
2627
- ClickHouse
2728
- DuckDB
@@ -165,6 +166,57 @@ EOF
165166
cargo run -p datafusion-table-providers --example mysql --features mysql
166167
```
167168

169+
### Oracle
170+
171+
In order to run the Oracle example, you need to have an Oracle database server running. You can use the following command to start an Oracle Free server in a Docker container the example can use:
172+
173+
```bash
174+
docker run --name oracle-free \
175+
-e ORACLE_PASSWORD=OraclePassword123 \
176+
-p 1521:1521 \
177+
-d gvenzl/oracle-free:latest
178+
179+
# Wait for the Oracle server to start and healthcheck to pass
180+
echo "Waiting for Oracle to start (this may take 1-2 minutes)..."
181+
until docker exec oracle-free /usr/local/bin/checkHealth.sh >/dev/null 2>&1; do
182+
sleep 5
183+
done
184+
echo "Oracle is ready!"
185+
186+
# Create a table in the Oracle server and insert some data
187+
docker exec -i oracle-free sqlplus system/OraclePassword123@FREEPDB1 <<EOF
188+
CREATE TABLE companies (
189+
id NUMBER PRIMARY KEY,
190+
name VARCHAR2(100)
191+
);
192+
193+
INSERT INTO companies (id, name) VALUES (1, 'Acme Corporation');
194+
INSERT INTO companies (id, name) VALUES (2, 'Widget Inc.');
195+
COMMIT;
196+
EXIT;
197+
EOF
198+
```
199+
200+
**Prerequisites:** The `rust-oracle` crate requires Oracle Instant Client libraries (ODPI-C). Install them:
201+
202+
- **Linux (Debian/Ubuntu):**
203+
```bash
204+
apt-get install libaio1 wget unzip
205+
wget https://download.oracle.com/otn_software/linux/instantclient/instantclient-basiclite-linuxx64.zip
206+
unzip instantclient-basiclite-linuxx64.zip -d /opt/oracle
207+
export LD_LIBRARY_PATH=/opt/oracle/instantclient_XX_X:$LD_LIBRARY_PATH
208+
```
209+
210+
- **macOS:**
211+
```bash
212+
brew install instantclient-basic
213+
```
214+
215+
```bash
216+
# Run from repo folder
217+
cargo run -p datafusion-table-providers --example oracle --features oracle
218+
```
219+
168220
### Flight SQL
169221

170222
```bash

core/Cargo.toml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,8 @@ rust_decimal = { version = "1.38.0", features = ["db-postgres"] }
103103
adbc_driver_manager = { workspace = true, optional = true }
104104
adbc_core = { workspace = true, optional = true }
105105
r2d2_adbc = { version = "0.2.0", optional = true }
106+
oracle = { version = "0.6", optional = true }
107+
bb8-oracle = { version = "0.3", optional = true }
106108

107109
[dev-dependencies]
108110
anyhow = "1.0"
@@ -167,6 +169,14 @@ adbc = [
167169
"dep:r2d2",
168170
]
169171
adbc-federation = ["adbc", "federation"]
172+
oracle = [
173+
"dep:oracle",
174+
"dep:bb8",
175+
"dep:bb8-oracle",
176+
"dep:async-stream",
177+
"dep:arrow-schema",
178+
]
179+
oracle-federation = ["oracle", "federation"]
170180

171181
# docs.rs-specific configuration
172182
[package.metadata.docs.rs]

core/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ pub mod flight;
1717
pub mod mysql;
1818
#[cfg(feature = "odbc")]
1919
pub mod odbc;
20+
#[cfg(feature = "oracle")]
21+
pub mod oracle;
2022
#[cfg(feature = "postgres")]
2123
pub mod postgres;
2224
#[cfg(feature = "sqlite")]

core/src/oracle.rs

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
use crate::sql::db_connection_pool::oraclepool::OracleConnectionPool;
2+
use crate::sql::{db_connection_pool, sql_provider_datafusion::SqlTable};
3+
use async_trait::async_trait;
4+
5+
use datafusion::error::DataFusionError;
6+
use datafusion::{
7+
catalog::{Session, TableProviderFactory},
8+
datasource::TableProvider,
9+
logical_expr::CreateExternalTable,
10+
sql::TableReference,
11+
};
12+
use secrecy::SecretString;
13+
use snafu::prelude::*;
14+
use std::collections::HashMap;
15+
use std::sync::Arc;
16+
17+
#[cfg(feature = "oracle-federation")]
18+
pub mod federation;
19+
pub mod sql_table;
20+
pub mod write;
21+
22+
use self::sql_table::OracleTable;
23+
use crate::sql::db_connection_pool::dbconnection::oracleconn::OraclePooledConnection;
24+
25+
#[derive(Debug, Snafu)]
26+
pub enum Error {
27+
#[snafu(display("DbConnectionError: {source}"))]
28+
DbConnectionError {
29+
source: db_connection_pool::dbconnection::GenericError,
30+
},
31+
32+
#[snafu(display("Unable to create Oracle connection pool: {source}"))]
33+
UnableToCreateConnectionPool {
34+
source: db_connection_pool::oraclepool::Error,
35+
},
36+
37+
#[snafu(display("Unable to create table provider: {source}"))]
38+
UnableToCreateTableProvider {
39+
source: Box<dyn std::error::Error + Send + Sync>,
40+
},
41+
}
42+
43+
pub type Result<T, E = Error> = std::result::Result<T, E>;
44+
45+
pub struct OracleTableFactory {
46+
pool: Arc<OracleConnectionPool>,
47+
}
48+
49+
impl OracleTableFactory {
50+
#[must_use]
51+
pub fn new(pool: Arc<OracleConnectionPool>) -> Self {
52+
Self { pool }
53+
}
54+
55+
pub async fn table_provider(
56+
&self,
57+
table_reference: TableReference,
58+
) -> Result<Arc<dyn TableProvider + 'static>, Box<dyn std::error::Error + Send + Sync>> {
59+
let pool = Arc::clone(&self.pool);
60+
let dyn_pool = pool as Arc<
61+
dyn db_connection_pool::DbConnectionPool<
62+
OraclePooledConnection,
63+
oracle::sql_type::OracleType,
64+
> + Send
65+
+ Sync
66+
+ 'static,
67+
>;
68+
69+
let table = SqlTable::new("oracle", &dyn_pool, table_reference)
70+
.await
71+
.map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
72+
73+
let oracle_table = Arc::new(OracleTable::new(Arc::clone(&self.pool), table));
74+
75+
#[cfg(feature = "oracle-federation")]
76+
let oracle_table = Arc::new(
77+
oracle_table
78+
.create_federated_table_provider()
79+
.map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?,
80+
);
81+
82+
Ok(oracle_table)
83+
}
84+
}
85+
86+
#[derive(Debug)]
87+
pub struct OracleTableProviderFactory {}
88+
89+
impl OracleTableProviderFactory {
90+
#[must_use]
91+
pub fn new() -> Self {
92+
Self {}
93+
}
94+
}
95+
96+
impl Default for OracleTableProviderFactory {
97+
fn default() -> Self {
98+
Self::new()
99+
}
100+
}
101+
102+
#[async_trait]
103+
impl TableProviderFactory for OracleTableProviderFactory {
104+
async fn create(
105+
&self,
106+
_state: &dyn Session,
107+
cmd: &CreateExternalTable,
108+
) -> datafusion::common::Result<Arc<dyn TableProvider>> {
109+
let name = cmd.name.to_string();
110+
let options = &cmd.options;
111+
112+
// Construct params from options
113+
let mut params: HashMap<String, SecretString> = HashMap::new();
114+
for (k, v) in options {
115+
params.insert(k.clone(), SecretString::from(v.clone()));
116+
}
117+
118+
let pool = OracleConnectionPool::new(params)
119+
.await
120+
.map_err(|e| DataFusionError::External(Box::new(e)))?;
121+
122+
let factory = OracleTableFactory::new(Arc::new(pool));
123+
124+
let table = factory
125+
.table_provider(TableReference::from(name))
126+
.await
127+
.map_err(DataFusionError::External)?;
128+
129+
Ok(table)
130+
}
131+
}

core/src/oracle/federation.rs

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
use crate::sql::db_connection_pool::dbconnection::oracleconn::OraclePooledConnection;
2+
use crate::sql::db_connection_pool::dbconnection::{get_schema, Error as DbError};
3+
use crate::sql::sql_provider_datafusion::{get_stream, to_execution_error};
4+
use arrow::datatypes::SchemaRef;
5+
use async_trait::async_trait;
6+
use datafusion::sql::unparser::dialect::Dialect;
7+
use datafusion_federation::sql::{
8+
RemoteTableRef, SQLExecutor, SQLFederationProvider, SQLTableSource,
9+
};
10+
use datafusion_federation::{FederatedTableProviderAdaptor, FederatedTableSource};
11+
use futures::TryStreamExt;
12+
use snafu::ResultExt;
13+
use std::sync::Arc;
14+
15+
use super::sql_table::OracleTable;
16+
use datafusion::{
17+
datasource::TableProvider,
18+
error::{DataFusionError, Result as DataFusionResult},
19+
execution::SendableRecordBatchStream,
20+
physical_plan::stream::RecordBatchStreamAdapter,
21+
sql::TableReference,
22+
};
23+
24+
impl OracleTable {
25+
pub fn create_federated_table_source(
26+
self: Arc<Self>,
27+
) -> DataFusionResult<Arc<dyn FederatedTableSource>> {
28+
let table_reference = self.base_table.table_reference.clone();
29+
let schema = Arc::clone(&self.base_table.schema());
30+
let fed_provider = Arc::new(SQLFederationProvider::new(self.clone()));
31+
Ok(Arc::new(SQLTableSource::new_with_schema(
32+
fed_provider,
33+
RemoteTableRef::from(table_reference),
34+
schema,
35+
)))
36+
}
37+
38+
pub fn create_federated_table_provider(
39+
self: Arc<Self>,
40+
) -> DataFusionResult<FederatedTableProviderAdaptor> {
41+
let table_source = self.clone().create_federated_table_source()?;
42+
Ok(FederatedTableProviderAdaptor::new_with_provider(
43+
table_source,
44+
self,
45+
))
46+
}
47+
}
48+
49+
#[async_trait]
50+
impl SQLExecutor for OracleTable {
51+
fn name(&self) -> &str {
52+
self.base_table.name()
53+
}
54+
55+
fn compute_context(&self) -> Option<String> {
56+
None
57+
}
58+
59+
fn dialect(&self) -> Arc<dyn Dialect> {
60+
Arc::new(datafusion::sql::unparser::dialect::PostgreSqlDialect {})
61+
}
62+
63+
fn execute(
64+
&self,
65+
query: &str,
66+
schema: SchemaRef,
67+
) -> DataFusionResult<SendableRecordBatchStream> {
68+
let pool = self.base_table.clone_pool();
69+
let dyn_pool = pool as Arc<
70+
dyn crate::sql::db_connection_pool::DbConnectionPool<
71+
OraclePooledConnection,
72+
oracle::sql_type::OracleType,
73+
> + Send
74+
+ Sync,
75+
>;
76+
let fut = get_stream(dyn_pool, query.to_string(), Arc::clone(&schema));
77+
78+
let stream = futures::stream::once(fut).try_flatten();
79+
Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
80+
}
81+
82+
async fn table_names(&self) -> DataFusionResult<Vec<String>> {
83+
Err(DataFusionError::NotImplemented(
84+
"table inference not implemented".to_string(),
85+
))
86+
}
87+
88+
async fn get_table_schema(&self, table_name: &str) -> DataFusionResult<SchemaRef> {
89+
let pool = self.base_table.clone_pool();
90+
let dyn_pool = pool as Arc<
91+
dyn crate::sql::db_connection_pool::DbConnectionPool<
92+
OraclePooledConnection,
93+
oracle::sql_type::OracleType,
94+
> + Send
95+
+ Sync,
96+
>;
97+
let conn = dyn_pool.connect().await.map_err(to_execution_error)?;
98+
get_schema(conn, &TableReference::from(table_name))
99+
.await
100+
.boxed()
101+
.map_err(|e| DbError::UnableToGetSchema { source: e })
102+
.map_err(to_execution_error)
103+
}
104+
}

0 commit comments

Comments
 (0)