Skip to content

df47 bump + fmt #3

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 28 additions & 29 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,30 @@ license = "Apache-2.0"
description = "Extend the capabilities of DataFusion to support additional data sources via implementations of the `TableProvider` trait."

[dependencies]
arrow = "53.0.0"
arrow-array = { version = "53.0.0", optional = true }
arrow-cast = { version = "53.0.0", optional = true }
arrow-flight = { version = "53.0.0", optional = true, features = [
arrow = "55.0.0"
arrow-array = { version = "55.0.0", optional = true }
arrow-cast = { version = "55.0.0", optional = true }
arrow-flight = { version = "55.0.0", optional = true, features = [
"flight-sql-experimental",
"tls",
] }
arrow-schema = { version = "53.0.0", optional = true, features = ["serde"] }
arrow-json = "53.0.0"
arrow-schema = { version = "55.0.0", optional = true, features = ["serde"] }
arrow-json = "55.0.0"
async-stream = { version = "0.3.5", optional = true }
async-trait = "0.1.80"
num-bigint = "0.4.4"
base64 = { version = "0.22.1", optional = true }
bytes = { version = "1.7.1", optional = true }
bigdecimal = "0.4.5"
bigdecimal_0_3_0 = { package = "bigdecimal", version = "0.3.0" }
bigdecimal = "0.4.8"
#bigdecimal_0_3_0 = { package = "bigdecimal", version = "0.4.7" }
byteorder = "1.5.0"
chrono = "0.4.38"
datafusion = "42.0.0"
datafusion-expr = { version = "42.0.0", optional = true }
datafusion-physical-expr = { version = "42.0.0", optional = true }
datafusion-physical-plan = { version = "42.0.0", optional = true }
datafusion-proto = { version = "42.0.0", optional = true }
datafusion-federation = { version = "0.3.0", features = ["sql"] }
datafusion = "47.0.0"
datafusion-expr = { version = "47.0.0", optional = true }
datafusion-physical-expr = { version = "47.0.0", optional = true }
datafusion-physical-plan = { version = "47.0.0", optional = true }
datafusion-proto = { version = "47.0.0", optional = true }
datafusion-federation = { version = "0.4.2", features = ["sql"] }
duckdb = { version = "1", features = [
"bundled",
"r2d2",
Expand All @@ -41,14 +41,14 @@ duckdb = { version = "1", features = [
], optional = true }
fallible-iterator = "0.3.0"
futures = "0.3.30"
mysql_async = { version = "0.34.1", features = [
mysql_async = { version = "0.35.1", features = [
"native-tls-tls",
"chrono",
], optional = true }
prost = { version = "0.13.3", optional = true } # pinned for arrow-flight compat
r2d2 = { version = "0.8.10", optional = true }
rusqlite = { version = "0.31.0", optional = true }
sea-query = { version = "0.31.0", features = [
rusqlite = { version = "0.32.0", optional = true }
sea-query = { version = "0.32.3", features = [
"backend-sqlite",
"backend-postgres",
"postgres-array",
Expand All @@ -57,7 +57,7 @@ sea-query = { version = "0.31.0", features = [
"with-time",
"with-chrono",
] }
secrecy = "0.8.0"
secrecy = "0.10.3"
serde = { version = "1.0.209", optional = true }
serde_json = "1.0.124"
snafu = "0.8.3"
Expand All @@ -72,29 +72,28 @@ tokio-postgres = { version = "0.7.10", features = [
tracing = "0.1.40"
uuid = { version = "1.9.1", optional = true }
postgres-native-tls = { version = "0.5.0", optional = true }
bb8 = { version = "0.8", optional = true }
bb8-postgres = { version = "0.8", optional = true }
bb8 = { version = "0.9", optional = true }
bb8-postgres = { version = "0.9", optional = true }
native-tls = { version = "0.2.11", optional = true }
trust-dns-resolver = "0.23.2"
url = "2.5.1"
pem = { version = "3.0.4", optional = true }
tokio-rusqlite = { version = "0.5.1", optional = true }
tonic = { version = "0.11", optional = true } # pinned for arrow-flight compat
itertools = "0.13.0"
tokio-rusqlite = { version = "0.6.0", optional = true }
tonic = { version = "0.13.0", optional = true } # pinned for arrow-flight compat
itertools = "0.14.0"
geo-types = "0.7.13"

[dev-dependencies]
anyhow = "1.0.86"
bollard = "0.16.1"
rand = "0.8.5"
bollard = "0.18.1"
rand = "0.9.0"
reqwest = "0.12.5"
secrecy = "0.8.0"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
test-log = { version = "0.2.16", features = ["trace"] }
rstest = "0.22.0"
geozero = { version = "0.13.0", features = ["with-wkb"] }
rstest = "0.25.0"
geozero = { version = "0.14.0", features = ["with-wkb"] }
tokio-stream = { version = "0.1.15", features = ["net"] }
arrow-schema = "52.2.0"
arrow-schema = "55.0.0"

[features]
mysql = ["dep:mysql_async", "dep:async-stream"]
Expand Down
30 changes: 30 additions & 0 deletions Justfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
export RUST_BACKTRACE := "1"


default:
@just --list


lint:
cargo +nightly clippy \
--verbose \
-- -Aclippy::all \
-Aclippy::pedantic \
-Aclippy::nursery


build:
cargo build


fmt:
cargo +nightly fmt


upgrade: fmt
@cargo upgrade
@cargo update


test:
cargo test
4 changes: 2 additions & 2 deletions src/sql/arrow_sql_gen/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use arrow::{
datatypes::{DataType, Field, Fields, IntervalUnit, Schema, SchemaRef, TimeUnit},
util::display::array_value_to_string,
};
use bigdecimal_0_3_0::BigDecimal;
use bigdecimal::BigDecimal;
use chrono::{DateTime, FixedOffset};
use num_bigint::BigInt;
use sea_query::{
Expand All @@ -16,7 +16,7 @@ use sea_query::{
QueryBuilder, SimpleExpr, SqliteQueryBuilder, StringLen, Table,
};
use snafu::Snafu;
use std::{any::Any, sync::Arc};
use std::sync::Arc;
use time::{OffsetDateTime, PrimitiveDateTime};

#[derive(Debug, Snafu)]
Expand Down
2 changes: 1 addition & 1 deletion src/sql/db_connection_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub enum JoinPushDown {
}

#[async_trait]
pub trait DbConnectionPool<T, P: 'static> {
pub trait DbConnectionPool<T, P: 'static>: std::fmt::Debug + Send + Sync {
async fn connect(&self) -> Result<Box<dyn DbConnection<T, P>>>;

fn join_push_down(&self) -> JoinPushDown;
Expand Down
16 changes: 10 additions & 6 deletions src/sql/sql_provider_datafusion/federation.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use crate::sql::db_connection_pool::{dbconnection::get_schema, JoinPushDown};
use async_trait::async_trait;
use datafusion_federation::sql::{SQLExecutor, SQLFederationProvider, SQLTableSource};
use datafusion_federation::sql::{
RemoteTableRef, SQLExecutor, SQLFederationProvider, SQLTableSource,
};
use datafusion_federation::{FederatedTableProviderAdaptor, FederatedTableSource};
use futures::TryStreamExt;
use snafu::prelude::*;
Expand All @@ -16,18 +18,20 @@ use datafusion::{
sql::{unparser::dialect::Dialect, TableReference},
};

impl<T, P> SqlTable<T, P> {
impl<T: std::fmt::Display + std::fmt::Debug, P: std::fmt::Display + std::fmt::Debug>
SqlTable<T, P>
{
fn create_federated_table_source(
self: Arc<Self>,
) -> DataFusionResult<Arc<dyn FederatedTableSource>> {
let table_name = self.table_reference.to_quoted_string();
let table_ref = RemoteTableRef::try_from(self.table_reference.to_quoted_string())?;
let schema = Arc::clone(&self.schema);
let fed_provider = Arc::new(SQLFederationProvider::new(self));
Ok(Arc::new(SQLTableSource::new_with_schema(
fed_provider,
table_name,
table_ref,
schema,
)?))
)))
}

pub fn create_federated_table_provider(
Expand All @@ -42,7 +46,7 @@ impl<T, P> SqlTable<T, P> {
}

#[async_trait]
impl<T, P> SQLExecutor for SqlTable<T, P> {
impl<T: std::fmt::Display, P: std::fmt::Display> SQLExecutor for SqlTable<T, P> {
fn name(&self) -> &str {
self.name
}
Expand Down
23 changes: 14 additions & 9 deletions src/sql/sql_provider_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,16 @@ use datafusion::{
datasource::TableProvider,
error::{DataFusionError, Result as DataFusionResult},
execution::TaskContext,
logical_expr::{Expr, TableProviderFilterPushDown, TableType},
logical_expr::{
sqlparser::tokenizer::{Location, Span},
Expr, TableProviderFilterPushDown, TableType,
},
physical_expr::EquivalenceProperties,
physical_plan::{
stream::RecordBatchStreamAdapter, DisplayAs, DisplayFormatType, ExecutionMode,
ExecutionPlan, Partitioning, PlanProperties, SendableRecordBatchStream,
execution_plan::{Boundedness, EmissionType},
stream::RecordBatchStreamAdapter,
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
SendableRecordBatchStream,
},
sql::{sqlparser::ast, unparser::Unparser, TableReference},
};
Expand Down Expand Up @@ -78,7 +83,7 @@ impl Engine {

pub type Result<T, E = Error> = std::result::Result<T, E>;

#[derive(Clone)]
#[derive(Debug, Clone)]
pub struct SqlTable<T: 'static, P: 'static> {
name: &'static str,
pool: Arc<dyn DbConnectionPool<T, P> + Send + Sync>,
Expand Down Expand Up @@ -165,7 +170,7 @@ impl<T, P> SqlTable<T, P> {
}

#[async_trait]
impl<T, P> TableProvider for SqlTable<T, P> {
impl<T: std::fmt::Debug, P: std::fmt::Debug> TableProvider for SqlTable<T, P> {
fn as_any(&self) -> &dyn Any {
self
}
Expand Down Expand Up @@ -262,7 +267,8 @@ impl<T, P> SqlExec<T, P> {
properties: PlanProperties::new(
EquivalenceProperties::new(projected_schema),
Partitioning::UnknownPartitioning(1),
ExecutionMode::Bounded,
EmissionType::Both,
Boundedness::Bounded,
),
engine,
})
Expand Down Expand Up @@ -340,6 +346,7 @@ impl<T, P> SqlExec<T, P> {
ast::Expr::Identifier(ast::Ident {
value: ident.to_string(),
quote_style,
span: Span::new(Location::empty(), Location::empty()),
})
.to_string()
}
Expand Down Expand Up @@ -427,12 +434,9 @@ pub fn to_execution_error(
mod tests {
use std::{error::Error, sync::Arc};

use datafusion::execution::context::SessionContext;
use datafusion::sql::TableReference;
use tracing::{level_filters::LevelFilter, subscriber::DefaultGuard, Dispatch};

use crate::sql::sql_provider_datafusion::SqlTable;

fn setup_tracing() -> DefaultGuard {
let subscriber: tracing_subscriber::FmtSubscriber = tracing_subscriber::fmt()
.with_max_level(LevelFilter::DEBUG)
Expand Down Expand Up @@ -471,6 +475,7 @@ mod tests {
}
}

#[derive(Debug)]
struct MockDBPool {}

#[async_trait]
Expand Down
4 changes: 1 addition & 3 deletions src/util/constraints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,7 @@ async fn validate_batch_with_constraint(
let ctx = SessionContext::new();
let df = ctx.read_batches(batches).context(DataFusionSnafu)?;

let Ok(count_name) = count(lit(COUNT_STAR_EXPANSION)).display_name() else {
unreachable!()
};
let count_name = count(lit(COUNT_STAR_EXPANSION)).schema_name().to_string();

// This is equivalent to:
// ```sql
Expand Down
2 changes: 1 addition & 1 deletion src/util/secrets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ pub fn to_secret_map<S: ::std::hash::BuildHasher + ::std::default::Default>(
map: HashMap<String, String, S>,
) -> HashMap<String, SecretString, S> {
map.into_iter()
.map(|(k, v)| (k, SecretString::new(v)))
.map(|(k, v)| (k, SecretString::new(v.into()).into()))
.collect()
}
9 changes: 7 additions & 2 deletions src/util/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ use datafusion::{
physical_expr::EquivalenceProperties,
physical_plan::{
common,
execution_plan::{Boundedness, EmissionType},
stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter},
DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, PlanProperties,
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
},
};

Expand Down Expand Up @@ -59,7 +60,8 @@ impl MockExec {
PlanProperties::new(
eq_properties,
Partitioning::UnknownPartitioning(1),
ExecutionMode::Bounded,
EmissionType::Both,
Boundedness::Bounded,
)
}
}
Expand All @@ -70,6 +72,9 @@ impl DisplayAs for MockExec {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "MockExec")
}
DisplayFormatType::TreeRender => {
write!(f, "MockExec")
}
}
}
}
Expand Down
5 changes: 2 additions & 3 deletions tests/arrow_record_batch_gen/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@ use arrow::array::RecordBatch;
use arrow::{
array::*,
datatypes::{
i256, DataType, Date32Type, Date64Type, Field, Fields, IntervalDayTime,
IntervalMonthDayNano, IntervalUnit, IntervalYearMonthType, Schema, SchemaRef, TimeUnit,
i256, DataType, Date32Type, Date64Type, Field, IntervalDayTime, IntervalMonthDayNano,
IntervalUnit, Schema, SchemaRef, TimeUnit,
},
};
use chrono::NaiveDate;
use std::sync::Arc;
use types::IntervalDayTimeType;

// Helper functions to create arrow record batches of different types

Expand Down