Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ datafusion-common-runtime = "52.0.0"
datafusion-datasource = "52.0.0"
datafusion-execution = "52.0.0"
datafusion-expr = "52.0.0"
datafusion-federation = { git = "https://github.com/spiceai/datafusion-federation.git", rev = "a4fce79433e5c2fe779427c0bfce6a599193e300", features = ["sql"] } # branch: trunk; SHA format (short vs full) must match to what datafusion-table-providers use for datafusion-federation
datafusion-federation = { git = "https://github.com/spiceai/datafusion-federation.git", rev = "6b6bfb0d30da8e5c2eb851094e366d98fa839575", features = ["sql"] } # branch: trunk; SHA format (short vs full) must match to what datafusion-table-providers use for datafusion-federation
datafusion-functions = "52.0.0"
datafusion-functions-json = "0.52"
datafusion-physical-expr = "52.0.0"
Expand Down Expand Up @@ -427,7 +427,7 @@ datafusion-physical-optimizer = { git = "https://github.com/spiceai/datafusion.g
datafusion-spark = { git = "https://github.com/spiceai/datafusion.git", rev = "06e4b624c6073c40c7b2127ce620e281ec1979ae" } # spiceai-52.5
datafusion-substrait = { git = "https://github.com/spiceai/datafusion.git", rev = "06e4b624c6073c40c7b2127ce620e281ec1979ae" } # spiceai-52.5

datafusion-table-providers = { git = "https://github.com/datafusion-contrib/datafusion-table-providers.git", rev = "29f66085875f1d4d7b80fc1947e68c6835302241" } # spiceai-52
datafusion-table-providers = { git = "https://github.com/datafusion-contrib/datafusion-table-providers.git", rev = "466acff6af701872ba41de99c1783df2a288ddf8" } # spiceai-52

ballista-core = { git = "https://github.com/spiceai/datafusion-ballista.git", rev = "07be66a8efff7e20c2abadaaba6ef62b02fc1ffc" } # spiceai-52.5
ballista-executor = { git = "https://github.com/spiceai/datafusion-ballista.git", rev = "07be66a8efff7e20c2abadaaba6ef62b02fc1ffc" } # spiceai-52.5
Expand Down
2 changes: 1 addition & 1 deletion crates/cayenne/src/metastore/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ async fn configure_sqlite_connection(
///
/// Pool size is `min(available_parallelism, 32)` (minimum 2). If
/// `available_parallelism()` fails (rare — e.g. seccomp-restricted
/// environments), K falls back to 4. `SQLite` WAL mode allows many
/// environments), `K` falls back to 4. `SQLite` WAL mode allows many
/// concurrent readers per database file (read-only operations don't take
/// the WAL write lock), so a larger pool lifts the read-side concurrency
/// ceiling for metadata-heavy workloads — e.g. 64-core deployments running
Expand Down
1 change: 1 addition & 0 deletions crates/data-connectors/connector-mysql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ description = "MySQL data connector for Spice.ai runtime"
[dependencies]
async-trait.workspace = true
chrono.workspace = true
data_components = { path = "../../data_components", features = ["mysql"] }
datafusion.workspace = true
datafusion-table-providers = { workspace = true, features = ["mysql", "mysql-federation"] }
linkme.workspace = true
Expand Down
87 changes: 83 additions & 4 deletions crates/data-connectors/connector-mysql/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use datafusion_table_providers::sql::db_connection_pool::{
Error as DbConnectionPoolError, dbconnection,
mysqlpool::{self, MySQLConnectionPool},
};
use mysql_async::Metrics;
use mysql_async::{Metrics, prelude::Queryable};
use opentelemetry::KeyValue;
use runtime::component::ComponentType;
use runtime::component::dataset::Dataset;
Expand All @@ -36,6 +36,7 @@ use runtime::parameters::ParameterSpec;
use secrecy::ExposeSecret;
use snafu::prelude::*;
use std::any::Any;
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
Expand All @@ -58,6 +59,7 @@ const DEFAULT_CONNECTION_POOL_MAX: usize = 5;

pub struct MySQL {
mysql_factory: MySQLTableFactory,
pool: Arc<MySQLConnectionPool>,
}

impl std::fmt::Debug for MySQL {
Expand Down Expand Up @@ -258,9 +260,12 @@ impl DataConnectorFactory for MySQLFactory {
}
},
};
let mysql_factory = MySQLTableFactory::new(pool);
let mysql_factory = MySQLTableFactory::new(Arc::clone(&pool));

Ok(Arc::new(MySQL { mysql_factory }) as Arc<dyn DataConnector>)
Ok(Arc::new(MySQL {
mysql_factory,
pool,
}) as Arc<dyn DataConnector>)
})
}

Expand All @@ -277,6 +282,74 @@ impl DataConnectorFactory for MySQLFactory {
}
}

async fn mysql_comment_metadata(
pool: &Arc<MySQLConnectionPool>,
table_reference: &datafusion::sql::TableReference,
) -> std::result::Result<
(HashMap<String, String>, data_components::FieldMetadata),
Box<dyn std::error::Error + Send + Sync>,
> {
let connection = pool.connect_direct().await?;
let mut conn = connection.conn.lock().await;
let table_schema = table_reference
.schema()
.or_else(|| table_reference.catalog())
.map(ToString::to_string);
let table_name = table_reference.table().to_string();

let rows: Vec<data_components::mysql::provider::MySqlTableMetadataRow> = conn
.exec(
"SELECT \
NULLIF(t.TABLE_COMMENT, '') AS TABLE_COMMENT, \
c.COLUMN_NAME, \
NULLIF(c.COLUMN_COMMENT, '') AS COLUMN_COMMENT, \
c.COLUMN_TYPE \
FROM information_schema.TABLES t \
LEFT JOIN information_schema.COLUMNS c \
ON c.TABLE_SCHEMA = t.TABLE_SCHEMA \
AND c.TABLE_NAME = t.TABLE_NAME \
WHERE t.TABLE_SCHEMA = COALESCE(?, DATABASE()) \
AND t.TABLE_NAME = ? \
ORDER BY c.ORDINAL_POSITION",
(table_schema, table_name),
)
.await?;

Ok(data_components::mysql::provider::mysql_metadata_from_rows(
rows,
))
}

async fn enrich_with_mysql_comments(
pool: &Arc<MySQLConnectionPool>,
dataset: &Dataset,
table_reference: &datafusion::sql::TableReference,
provider: Arc<dyn TableProvider>,
) -> Arc<dyn TableProvider> {
match mysql_comment_metadata(pool, table_reference).await {
Ok((table_metadata, field_metadata)) => {
if table_metadata.is_empty() && field_metadata.is_empty() {
provider
} else {
data_components::metadata_enriched_table_provider(
provider,
table_metadata,
field_metadata,
)
}
}
Err(error) => {
tracing::warn!(
dataset = %dataset.name,
source = %dataset.path(),
error = %error,
"Failed to query MySQL comments; registering without comment metadata"
);
provider
}
}
}

#[async_trait]
impl DataConnector for MySQL {
fn as_any(&self) -> &dyn Any {
Expand All @@ -299,8 +372,14 @@ impl DataConnector for MySQL {

// Call the inherent method directly instead of using Read trait
// (orphan rule prevents trait impl in external crate)
let table_reference = tbl.clone();
match self.mysql_factory.table_provider(tbl).await {
Ok(provider) => Ok(provider),
Ok(provider) => {
Ok(
enrich_with_mysql_comments(&self.pool, dataset, &table_reference, provider)
.await,
)
}
Err(e) => {
if let Some(err_source) = e.source()
&& let Some(dbconnection::Error::UndefinedTable {
Expand Down
1 change: 1 addition & 0 deletions crates/data-connectors/connector-postgres/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ secrecy.workspace = true
snafu.workspace = true
tokio.workspace = true
tokio-postgres.workspace = true
tracing.workspace = true

[features]
default = []
77 changes: 73 additions & 4 deletions crates/data-connectors/connector-postgres/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use runtime::parameters::ParameterSpec;
use secrecy::SecretBox;
use snafu::prelude::*;
use std::any::Any;
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
Expand All @@ -55,6 +56,7 @@ pub enum Error {
/// `PostgreSQL` data connector.
pub struct Postgres {
factory: PostgresTableFactory,
pool: Arc<PostgresConnectionPool>,
params: runtime::parameters::Parameters,
replication_metrics:
std::sync::Arc<data_components::postgres_replication::ReplicationMetricsCollector>,
Expand Down Expand Up @@ -200,9 +202,11 @@ impl DataConnectorFactory for PostgresFactory {
.unwrap_or(datafusion_table_providers::UnsupportedTypeAction::String);
let pool = pool.with_unsupported_type_action(unsupported_type_action);

let factory = PostgresTableFactory::new(Arc::new(pool));
let pool = Arc::new(pool);
let factory = PostgresTableFactory::new(Arc::clone(&pool));
Ok(Arc::new(Postgres {
factory,
pool,
params: params_for_replication,
replication_metrics:
data_components::postgres_replication::ReplicationMetricsCollector::new(
Expand Down Expand Up @@ -254,6 +258,68 @@ impl DataConnectorFactory for PostgresFactory {
}
}

async fn postgres_comment_metadata(
pool: &Arc<PostgresConnectionPool>,
table_path: &str,
) -> std::result::Result<
(HashMap<String, String>, data_components::FieldMetadata),
Box<dyn std::error::Error + Send + Sync>,
> {
let conn = pool.connect_direct().await?;
let rows = conn
.conn
.query(
"SELECT \
obj_description(c.oid, 'pg_class') AS table_comment, \
a.attname AS column_name, \
col_description(c.oid, a.attnum) AS column_comment, \
format_type(a.atttypid, a.atttypmod) AS column_source_type \
FROM pg_catalog.pg_class c \
JOIN pg_catalog.pg_attribute a \
ON a.attrelid = c.oid \
AND a.attnum > 0 \
AND NOT a.attisdropped \
WHERE c.oid = to_regclass($1) \
ORDER BY a.attnum",
&[&table_path],
)
.await?;
let rows = rows
.iter()
.map(|row| (row.get(0), row.get(1), row.get(2), row.get(3)));

Ok(data_components::postgres::provider::postgres_metadata_from_rows(rows))
}

async fn enrich_with_postgres_comments(
pool: &Arc<PostgresConnectionPool>,
dataset: &Dataset,
provider: Arc<dyn TableProvider>,
) -> Arc<dyn TableProvider> {
match postgres_comment_metadata(pool, dataset.path()).await {
Ok((table_metadata, field_metadata)) => {
if table_metadata.is_empty() && field_metadata.is_empty() {
provider
} else {
data_components::metadata_enriched_table_provider(
provider,
table_metadata,
field_metadata,
)
}
}
Err(error) => {
tracing::warn!(
dataset = %dataset.name,
source = %dataset.path(),
error = %error,
"Failed to query PostgreSQL comments; registering without comment metadata"
);
provider
}
}
}

#[async_trait]
impl DataConnector for Postgres {
fn as_any(&self) -> &dyn Any {
Expand All @@ -269,7 +335,10 @@ impl DataConnector for Postgres {
.read_write_table_provider(dataset.path().into())
.await
{
Ok(provider) => Some(Ok(provider)),
Ok(provider) => Some(Ok(enrich_with_postgres_comments(
&self.pool, dataset, provider,
)
.await)),
Err(e) => {
if let Some(err_source) = e.source() {
match err_source.downcast_ref::<dbconnection::Error>() {
Expand Down Expand Up @@ -298,7 +367,7 @@ impl DataConnector for Postgres {
}
}

Some(Err(DataConnectorError::UnableToGetReadProvider {
Some(Err(DataConnectorError::UnableToGetReadWriteProvider {
dataconnector: "postgres".to_string(),
connector_component: ConnectorComponent::from(dataset),
source: e,
Expand All @@ -312,7 +381,7 @@ impl DataConnector for Postgres {
dataset: &Dataset,
) -> DataConnectorResult<Arc<dyn TableProvider>> {
match self.factory.table_provider(dataset.path().into()).await {
Ok(provider) => Ok(provider),
Ok(provider) => Ok(enrich_with_postgres_comments(&self.pool, dataset, provider).await),
Err(e) => {
if let Some(err_source) = e.source() {
match err_source.downcast_ref::<dbconnection::Error>() {
Expand Down
Loading