Skip to content

Commit 60440c7

Browse files
committed
Add docs and fixes
1 parent 7da72d8 commit 60440c7

8 files changed

Lines changed: 127 additions & 45 deletions

File tree

datafusion-federation/examples/df-csv-advanced.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ async fn main() {
7878
let sqlite_federation_provider = Arc::new(SQLFederationProvider::new(sqlite_executor));
7979
// Create the schema provider
8080
let sqlite_schema_provider = Arc::new(
81-
SQLSchemaProvider::new_with_table_names(sqlite_federation_provider, sqlite_known_tables)
81+
SQLSchemaProvider::new_with_tables(sqlite_federation_provider, sqlite_known_tables)
8282
.await
8383
.expect("Create new schema provider with tables"),
8484
);
@@ -107,12 +107,9 @@ async fn main() {
107107
let postgres_federation_provider = Arc::new(SQLFederationProvider::new(postgres_executor));
108108
// Create the schema provider
109109
let postgres_schema_provider = Arc::new(
110-
SQLSchemaProvider::new_with_table_names(
111-
postgres_federation_provider,
112-
postgres_known_tables,
113-
)
114-
.await
115-
.expect("Create new schema provider with tables"),
110+
SQLSchemaProvider::new_with_tables(postgres_federation_provider, postgres_known_tables)
111+
.await
112+
.expect("Create new schema provider with tables"),
116113
);
117114

118115
/////////////////////

datafusion-federation/examples/df-csv.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ async fn main() -> Result<()> {
3030

3131
// Get the schema
3232
let schema_provider =
33-
Arc::new(SQLSchemaProvider::new_with_table_names(provider, known_tables).await?);
33+
Arc::new(SQLSchemaProvider::new_with_tables(provider, known_tables).await?);
3434

3535
// Main context
3636
let state = datafusion_federation::default_session_state();

datafusion-federation/src/sql/analyzer.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use super::SQLTableSource;
2020

2121
type Result<T> = std::result::Result<T, datafusion::error::DataFusionError>;
2222

23+
/// Rewrite LogicalPlan's table scans and expressions to use the federated table name.
2324
#[derive(Debug)]
2425
pub struct RewriteTableScanAnalyzer;
2526

@@ -31,7 +32,7 @@ impl RewriteTableScanAnalyzer {
3132
}
3233

3334
/// Rewrite table scans to use the original federated table name.
34-
pub fn rewrite_table_scans(
35+
fn rewrite_table_scans(
3536
plan: &LogicalPlan,
3637
known_rewrites: &mut HashMap<TableReference, TableReference>,
3738
) -> Result<LogicalPlan> {
@@ -176,7 +177,7 @@ pub fn rewrite_column_name_in_expr(
176177
}
177178
}
178179

179-
pub fn rewrite_table_scans_in_expr(
180+
fn rewrite_table_scans_in_expr(
180181
expr: Expr,
181182
known_rewrites: &mut HashMap<TableReference, TableReference>,
182183
) -> Result<Expr> {

datafusion-federation/src/sql/ast_analyzer.rs

Lines changed: 44 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::ops::ControlFlow;
22

33
use datafusion::sql::{
4-
sqlparser::ast::{TableFactor, TableFunctionArgs, VisitorMut},
4+
sqlparser::ast::{FunctionArg, TableFactor, TableFunctionArgs, VisitorMut},
55
TableReference,
66
};
77

@@ -15,21 +15,60 @@ pub fn replace_table_args_analyzer(mut visitor: TableArgReplace) -> AstAnalyzer
1515
Box::new(x)
1616
}
1717

18+
/// Used to construct a AstAnalyzer that can replace table arguments.
19+
///
20+
/// ```rust
21+
/// use datafusion::sql::sqlparser::ast::{FunctionArg, Expr, Value};
22+
/// use datafusion_federation::sql::ast_analyzer::TableArgReplace;
23+
///
24+
/// let mut analyzer = TableArgReplace::default().with(
25+
/// TableReference::parse_str("table1"),
26+
/// vec![FunctionArg::Unnamed(
27+
/// Expr::Value(
28+
/// Value::Number("1".to_string(), false),
29+
/// )
30+
/// .into(),
31+
/// )],
32+
/// );
33+
/// let analyzer = analyzer.into_analyzer();
34+
/// ```
1835
#[derive(Debug, Clone, PartialEq, Eq, Default)]
1936
pub struct TableArgReplace {
2037
pub tables: Vec<(TableReference, TableFunctionArgs)>,
2138
}
2239

2340
impl TableArgReplace {
24-
pub fn new(tables: Vec<(TableReference, TableFunctionArgs)>) -> Self {
25-
Self { tables }
41+
/// Constructs a new `TableArgReplace` instance.
42+
pub fn new(tables: Vec<(TableReference, Vec<FunctionArg>)>) -> Self {
43+
Self {
44+
tables: tables
45+
.into_iter()
46+
.map(|(table, args)| {
47+
(
48+
table,
49+
TableFunctionArgs {
50+
args,
51+
settings: None,
52+
},
53+
)
54+
})
55+
.collect(),
56+
}
2657
}
2758

28-
pub fn with(mut self, table: TableReference, args: TableFunctionArgs) -> Self {
29-
self.tables.push((table, args));
59+
/// Adds a new table argument replacement.
60+
pub fn with(mut self, table: TableReference, args: Vec<FunctionArg>) -> Self {
61+
self.tables.push((
62+
table,
63+
TableFunctionArgs {
64+
args,
65+
settings: None,
66+
},
67+
));
3068
self
3169
}
3270

71+
/// Converts the `TableArgReplace` instance into an `AstAnalyzer`.
3372
pub fn into_analyzer(self) -> AstAnalyzer {
3473
replace_table_args_analyzer(self)
3574
}

datafusion-federation/src/sql/mod.rs

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
pub mod analyzer;
1+
mod analyzer;
22
pub mod ast_analyzer;
33
mod executor;
44
mod schema;
@@ -368,7 +368,6 @@ mod tests {
368368
#[derive(Debug, Clone)]
369369
struct TestExecutor {
370370
compute_context: String,
371-
rewrite_table: bool,
372371
}
373372

374373
#[async_trait]
@@ -389,11 +388,6 @@ mod tests {
389388
unimplemented!()
390389
}
391390

392-
fn logical_optimizer(&self) -> Option<LogicalOptimizer> {
393-
self.rewrite_table
394-
.then_some(Box::new(analyzer::RewriteTableScanAnalyzer::rewrite))
395-
}
396-
397391
async fn table_names(&self) -> Result<Vec<String>> {
398392
unimplemented!()
399393
}
@@ -448,12 +442,10 @@ mod tests {
448442
async fn basic_sql_federation_test() -> Result<(), DataFusionError> {
449443
let test_executor_a = TestExecutor {
450444
compute_context: "a".into(),
451-
rewrite_table: false,
452445
};
453446

454447
let test_executor_b = TestExecutor {
455448
compute_context: "b".into(),
456-
rewrite_table: true,
457449
};
458450

459451
let table_a1_ref = "table_a1".to_string();

datafusion-federation/src/sql/schema.rs

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,25 @@ use std::{any::Any, str::FromStr, sync::Arc};
22

33
use async_trait::async_trait;
44
use datafusion::{
5-
catalog::SchemaProvider, datasource::TableProvider, error::Result, sql::TableReference,
5+
catalog::SchemaProvider,
6+
datasource::TableProvider,
7+
error::{DataFusionError, Result},
8+
sql::TableReference,
69
};
710
use futures::future::join_all;
811

912
use super::{table::SQLTable, RemoteTableRef, SQLTableSource};
1013
use crate::{sql::SQLFederationProvider, FederatedTableProviderAdaptor};
1114

15+
/// An in-memory schema provider for SQL tables.
1216
#[derive(Debug)]
1317
pub struct SQLSchemaProvider {
1418
tables: Vec<Arc<SQLTableSource>>,
1519
}
1620

1721
impl SQLSchemaProvider {
22+
/// Creates a new SQLSchemaProvider from a [`SQLFederationProvider`].
23+
/// Initializes the schema provider by fetching table names and schema from the federation provider's executor,
1824
pub async fn new(provider: Arc<SQLFederationProvider>) -> Result<Self> {
1925
let executor = Arc::clone(&provider.executor);
2026
let tables = executor
@@ -41,14 +47,19 @@ impl SQLSchemaProvider {
4147
Ok(Self { tables })
4248
}
4349

44-
pub async fn new_with_table_names(
50+
/// Creates a new SQLSchemaProvider from a SQLFederationProvider and a list of table references.
51+
/// Fetches the schema for each table using the executor's implementation.
52+
pub async fn new_with_tables<T>(
4553
provider: Arc<SQLFederationProvider>,
46-
tables: Vec<String>,
47-
) -> Result<Self> {
54+
tables: Vec<T>,
55+
) -> Result<Self>
56+
where
57+
T: TryInto<RemoteTableRef, Error = DataFusionError>,
58+
{
4859
let tables = tables
4960
.into_iter()
50-
.map(|t| RemoteTableRef::from_str(&t))
51-
.collect::<Result<Vec<_>>>()?;
61+
.map(|t| t.try_into())
62+
.collect::<Result<Vec<RemoteTableRef>>>()?;
5263
let futures: Vec<_> = tables
5364
.into_iter()
5465
.map(|t| SQLTableSource::new(Arc::clone(&provider), t))
@@ -58,7 +69,8 @@ impl SQLSchemaProvider {
5869
Ok(Self { tables })
5970
}
6071

61-
pub fn new_with_tables(
72+
/// Creates a new SQLSchemaProvider from a SQLFederationProvider and a list of custom table instances.
73+
pub fn new_with_custom_tables(
6274
provider: Arc<SQLFederationProvider>,
6375
tables: Vec<Arc<dyn SQLTable>>,
6476
) -> Self {

datafusion-federation/src/sql/table.rs

Lines changed: 29 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ use datafusion::arrow::datatypes::SchemaRef;
55
use datafusion::error::Result;
66
use datafusion::logical_expr::TableSource;
77
use datafusion::logical_expr::TableType;
8-
use datafusion::sql::sqlparser::ast::TableFunctionArgs;
98
use datafusion::sql::TableReference;
109
use std::any::Any;
1110
use std::sync::Arc;
@@ -15,16 +14,22 @@ use super::executor::LogicalOptimizer;
1514
use super::AstAnalyzer;
1615
use super::RemoteTableRef;
1716

17+
/// Trait to represent a SQL remote table inside [`SQLTableSource`].
18+
/// A remote table provides information such as schema, table reference, and
19+
/// provides hooks for rewriting the logical plan and AST before execution.
20+
/// This crate provides [`RemoteTable`] as a default ready-to-use type.
1821
pub trait SQLTable: std::fmt::Debug + Send + Sync {
1922
/// Returns a reference as a trait object.
2023
fn as_any(&self) -> &dyn Any;
21-
/// Table reference for the table, which can be used to identify the table in the SQL query.
22-
/// This method should return a unique identifier for the table.
23-
/// Used for registering the table to sql schema provider.
24+
/// Provides the [`TableReference`](`datafusion::sql::TableReference`) used to identify the table in SQL queries.
25+
/// This TableReference is used for registering the table with the [`SQLSchemaProvider`](`super::SQLSchemaProvider`).
26+
/// If the table provider is registered in the Datafusion context under a different name,
27+
/// the logical plan will be rewritten to use this table reference during execution.
28+
/// Therefore, any AST analyzer should match against this table reference.
2429
fn table_reference(&self) -> TableReference;
2530
/// Schema of the remote table
2631
fn schema(&self) -> SchemaRef;
27-
/// Returns an logical optimizer specific to this table, will be used to modify the logical plan before execution
32+
/// Returns a logical optimizer specific to this table, will be used to modify the logical plan before execution
2833
fn logical_optimizer(&self) -> Option<LogicalOptimizer> {
2934
None
3035
}
@@ -34,20 +39,33 @@ pub trait SQLTable: std::fmt::Debug + Send + Sync {
3439
}
3540
}
3641

37-
#[derive(Debug)]
42+
/// Represents a remote table with a reference and schema.
43+
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3844
pub struct RemoteTable {
3945
table_ref: RemoteTableRef,
4046
schema: SchemaRef,
4147
}
4248

4349
impl RemoteTable {
50+
/// Creates a new `RemoteTable` instance.
51+
///
52+
/// Examples:
53+
/// ```rust
54+
/// RemoteTable::new("myschema.table", schema);
55+
/// RemoteTable::new(r#"myschema."Table""#, schema);
56+
/// RemoteTable::new("myschema.view('obj')", schema);
57+
/// RemoteTable::new("myschema.view(name => 'obj')", schema);
58+
/// RemoteTable::new("myschema.view(name = 'obj')", schema);
59+
/// ```
4460
pub fn new(table_ref: impl Into<RemoteTableRef>, schema: SchemaRef) -> Self {
4561
Self {
4662
table_ref: table_ref.into(),
4763
schema,
4864
}
4965
}
5066

67+
/// Return table reference of this remote table.
68+
/// Only returns the object name, ignoring functional params if any
5169
pub fn table_reference(&self) -> TableReference {
5270
TableReference::from(&self.table_ref)
5371
}
@@ -70,17 +88,12 @@ impl SQLTable for RemoteTable {
7088
None
7189
}
7290

91+
/// Returns ast analyzer that modifies table that contains functional args after table ident
7392
fn ast_analyzer(&self) -> Option<AstAnalyzer> {
7493
if let RemoteTableRef::TableReferenceWithArgs(name, args) = &self.table_ref {
7594
Some(
7695
ast_analyzer::TableArgReplace::default()
77-
.with(
78-
name.clone(),
79-
TableFunctionArgs {
80-
args: args.iter().cloned().collect(),
81-
settings: None,
82-
},
83-
)
96+
.with(name.clone(), args.iter().cloned().collect())
8497
.into_analyzer(),
8598
)
8699
} else {
@@ -107,6 +120,7 @@ impl SQLTableSource {
107120
Ok(Self::new_with_schema(provider, remote_table_ref, schema))
108121
}
109122

123+
/// Create a SQLTableSource with a table reference and schema
110124
pub fn new_with_schema(
111125
provider: Arc<SQLFederationProvider>,
112126
table_ref: impl Into<RemoteTableRef>,
@@ -118,10 +132,12 @@ impl SQLTableSource {
118132
}
119133
}
120134

135+
/// Create new with a custom SQLtable instance.
121136
pub fn new_with_table(provider: Arc<SQLFederationProvider>, table: Arc<dyn SQLTable>) -> Self {
122137
Self { provider, table }
123138
}
124139

140+
/// Return associated table reference of stored remote table
125141
pub fn table_reference(&self) -> TableReference {
126142
self.table.table_reference()
127143
}

datafusion-federation/src/sql/table_reference.rs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,24 @@ use datafusion::{
88
},
99
};
1010

11-
#[derive(Debug, Clone, PartialEq, Eq)]
11+
/// A multipart identifier to a remote table, view or parameterized view.
12+
///
13+
/// RemoteTableRef can be created by parsing from a string represeting a table obbject with optional
14+
/// ```rust
15+
/// RemoteTableRef::from_str("myschema.table")
16+
/// RemoteTableRef::from_str(r#"myschema."Table""#)
17+
/// RemoteTableRef::from_str("myschema.view('obj')")
18+
/// RemoteTableRef::from_str("myschema.view(name => 'obj')")
19+
/// RemoteTableRef::from_str("myschema.view(name = 'obj')")
20+
/// ```
21+
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1222
pub enum RemoteTableRef {
1323
TableReference(TableReference),
1424
TableReferenceWithArgs(TableReference, Arc<[FunctionArg]>),
1525
}
1626

1727
impl RemoteTableRef {
28+
/// Get quoted_string representation for the table it is referencing, this is same as calling to_quoted_string on the inner table reference.
1829
pub fn to_quoted_string(&self) -> String {
1930
match self {
2031
RemoteTableRef::TableReference(table_ref) => table_ref.to_quoted_string(),
@@ -60,6 +71,20 @@ impl TryFrom<&str> for RemoteTableRef {
6071
}
6172
}
6273

74+
impl TryFrom<String> for RemoteTableRef {
75+
type Error = datafusion::error::DataFusionError;
76+
fn try_from(s: String) -> Result<Self, Self::Error> {
77+
s.parse()
78+
}
79+
}
80+
81+
impl TryFrom<&String> for RemoteTableRef {
82+
type Error = datafusion::error::DataFusionError;
83+
fn try_from(s: &String) -> Result<Self, Self::Error> {
84+
s.parse()
85+
}
86+
}
87+
6388
impl FromStr for RemoteTableRef {
6489
type Err = datafusion::error::DataFusionError;
6590

0 commit comments

Comments
 (0)