diff --git a/Cargo.toml b/Cargo.toml index e507b26..c18cef0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,8 +51,8 @@ skip-tests-with-docker = [] # Metadata provider backends metadata-duckdb = ["dep:duckdb"] metadata-postgres = ["dep:sqlx", "sqlx/postgres", "sqlx/chrono"] +metadata-mysql = ["dep:sqlx", "sqlx/mysql", "sqlx/chrono"] # Future: metadata-sqlite = ["sqlx", "sqlx/sqlite", "sqlx/chrono"] -# Future: metadata-mysql = ["sqlx", "sqlx/mysql", "sqlx/chrono"] # Encryption support for Parquet files encryption = ["parquet/encryption", "datafusion/parquet_encryption", "dep:base64", "dep:hex"] \ No newline at end of file diff --git a/examples/basic_query.rs b/examples/basic_query.rs index f596038..314394d 100644 --- a/examples/basic_query.rs +++ b/examples/basic_query.rs @@ -1,7 +1,7 @@ //! Basic DuckLake query example with snapshot isolation //! //! This example demonstrates how to: -//! 1. Create a DuckLake catalog from DuckDB or PostgreSQL +//! 1. Create a DuckLake catalog from DuckDB, PostgreSQL, or MySQL //! 2. Bind the catalog to a specific snapshot for query consistency //! 3. Register it with DataFusion //! 4. Execute a simple SELECT query @@ -29,11 +29,20 @@ //! "postgresql://user:password@localhost:5432/postgres" \ //! "SELECT * FROM main.users" //! ``` +//! +//! With MySQL catalog (requires --features metadata-mysql): +//! ```bash +//! cargo run --example basic_query --features metadata-mysql \ +//! "mysql://user:password@localhost:3306/database" \ +//! "SELECT * FROM main.users" +//! ``` use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::prelude::*; #[cfg(feature = "metadata-duckdb")] use datafusion_ducklake::DuckdbMetadataProvider; +#[cfg(feature = "metadata-mysql")] +use datafusion_ducklake::MySqlMetadataProvider; #[cfg(feature = "metadata-postgres")] use datafusion_ducklake::PostgresMetadataProvider; use datafusion_ducklake::{DuckLakeCatalog, MetadataProvider, register_ducklake_functions}; @@ -53,6 +62,9 @@ async fn main() -> Result<(), Box> { eprintln!( " PostgreSQL: cargo run --example basic_query --features metadata-postgres \"postgresql://...\" \"SQL\"" ); + eprintln!( + " MySQL: cargo run --example basic_query --features metadata-mysql \"mysql://...\" \"SQL\"" + ); exit(1); } let catalog_source = &args[1]; @@ -60,6 +72,7 @@ async fn main() -> Result<(), Box> { // Detect provider type based on input let is_postgres = catalog_source.starts_with("postgresql://"); + let is_mysql = catalog_source.starts_with("mysql://"); if is_postgres { #[cfg(not(feature = "metadata-postgres"))] @@ -77,6 +90,22 @@ async fn main() -> Result<(), Box> { println!("Current snapshot ID: {}", snapshot_id); run_query(provider, snapshot_id, sql).await?; } + } else if is_mysql { + #[cfg(not(feature = "metadata-mysql"))] + { + eprintln!("Error: MySQL support requires the 'metadata-mysql' feature"); + eprintln!("Run with: cargo run --example basic_query --features metadata-mysql"); + exit(1); + } + + #[cfg(feature = "metadata-mysql")] + { + println!("Connecting to MySQL catalog: {}", catalog_source); + let provider = Arc::new(MySqlMetadataProvider::new(catalog_source).await?); + let snapshot_id = provider.get_current_snapshot()?; + println!("Current snapshot ID: {}", snapshot_id); + run_query(provider, snapshot_id, sql).await?; + } } else { #[cfg(feature = "metadata-duckdb")] { diff --git a/src/error.rs b/src/error.rs index 0f0d7b2..89c295c 100644 --- a/src/error.rs +++ b/src/error.rs @@ -18,8 +18,8 @@ pub enum DuckLakeError { #[error("DuckDB error: {0}")] DuckDb(#[from] duckdb::Error), - /// sqlx database error (for PostgreSQL metadata provider) - #[cfg(feature = "metadata-postgres")] + /// sqlx database error (for PostgreSQL/MySQL metadata providers) + #[cfg(any(feature = "metadata-postgres", feature = "metadata-mysql"))] #[error("Database error: {0}")] Sqlx(#[from] sqlx::Error), diff --git a/src/lib.rs b/src/lib.rs index e0b2f2a..3fdce91 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -52,6 +52,8 @@ pub mod types; // Metadata providers (feature-gated) #[cfg(feature = "metadata-duckdb")] pub mod metadata_provider_duckdb; +#[cfg(feature = "metadata-mysql")] +pub mod metadata_provider_mysql; #[cfg(feature = "metadata-postgres")] pub mod metadata_provider_postgres; @@ -69,5 +71,7 @@ pub use table_functions::register_ducklake_functions; // Re-export metadata providers (feature-gated) #[cfg(feature = "metadata-duckdb")] pub use metadata_provider_duckdb::DuckdbMetadataProvider; +#[cfg(feature = "metadata-mysql")] +pub use metadata_provider_mysql::MySqlMetadataProvider; #[cfg(feature = "metadata-postgres")] pub use metadata_provider_postgres::PostgresMetadataProvider; diff --git a/src/metadata_provider.rs b/src/metadata_provider.rs index 5af1510..4ebb0cd 100644 --- a/src/metadata_provider.rs +++ b/src/metadata_provider.rs @@ -405,7 +405,7 @@ pub trait MetadataProvider: Send + Sync + std::fmt::Debug { ) -> Result>; } -#[cfg(feature = "metadata-postgres")] +#[cfg(any(feature = "metadata-postgres", feature = "metadata-mysql"))] /// Helper function to bridge async sqlx operations to sync MetadataProvider trait pub(crate) fn block_on(f: F) -> T where diff --git a/src/metadata_provider_mysql.rs b/src/metadata_provider_mysql.rs new file mode 100644 index 0000000..9a150b4 --- /dev/null +++ b/src/metadata_provider_mysql.rs @@ -0,0 +1,555 @@ +//! MySQL metadata provider for DuckLake catalogs. + +use crate::Result; +use crate::metadata_provider::{ + ColumnWithTable, DataFileChange, DeleteFileChange, DuckLakeFileData, DuckLakeTableColumn, + DuckLakeTableFile, FileWithTable, MetadataProvider, SchemaMetadata, SnapshotMetadata, + TableMetadata, TableWithSchema, block_on, +}; +use sqlx::Row; +use sqlx::mysql::{MySqlPool, MySqlPoolOptions}; +use sqlx::types::chrono::NaiveDateTime; + +/// MySQL-based metadata provider for DuckLake catalogs. +#[derive(Debug, Clone)] +pub struct MySqlMetadataProvider { + pub pool: MySqlPool, +} + +impl MySqlMetadataProvider { + /// Creates a new provider for an existing DuckLake catalog. + pub async fn new(connection_string: &str) -> Result { + let pool = MySqlPoolOptions::new() + .max_connections(5) + .connect(connection_string) + .await?; + + Ok(Self { + pool, + }) + } +} + +impl MetadataProvider for MySqlMetadataProvider { + fn get_current_snapshot(&self) -> Result { + block_on(async { + let row = sqlx::query("SELECT COALESCE(MAX(snapshot_id), 0) FROM ducklake_snapshot") + .fetch_one(&self.pool) + .await?; + Ok(row.try_get(0)?) + }) + } + + fn get_data_path(&self) -> Result { + block_on(async { + let row = sqlx::query( + "SELECT value FROM ducklake_metadata WHERE `key` = ? AND scope IS NULL", + ) + .bind("data_path") + .fetch_optional(&self.pool) + .await?; + + match row { + Some(r) => Ok(r.try_get(0)?), + None => Err(crate::error::DuckLakeError::InvalidConfig( + "Missing required catalog metadata: 'data_path' not configured. \ + The catalog may be uninitialized or corrupted." + .to_string(), + )), + } + }) + } + + fn list_snapshots(&self) -> Result> { + block_on(async { + let rows = sqlx::query( + "SELECT snapshot_id, snapshot_time + FROM ducklake_snapshot ORDER BY snapshot_id", + ) + .fetch_all(&self.pool) + .await?; + + rows.into_iter() + .map(|row| { + let snapshot_id: i64 = row.try_get(0)?; + let timestamp: Option = row.try_get(1)?; + let timestamp_str = timestamp + .map(|ts: NaiveDateTime| ts.format("%Y-%m-%d %H:%M:%S%.6f").to_string()); + + Ok(SnapshotMetadata { + snapshot_id, + timestamp: timestamp_str, + }) + }) + .collect() + }) + } + + fn list_schemas(&self, snapshot_id: i64) -> Result> { + block_on(async { + let rows = sqlx::query( + "SELECT schema_id, schema_name, path, path_is_relative FROM ducklake_schema + WHERE ? >= begin_snapshot AND (? < end_snapshot OR end_snapshot IS NULL)", + ) + .bind(snapshot_id) + .bind(snapshot_id) + .fetch_all(&self.pool) + .await?; + + rows.into_iter() + .map(|row| { + Ok(SchemaMetadata { + schema_id: row.try_get(0)?, + schema_name: row.try_get(1)?, + path: row.try_get(2)?, + path_is_relative: row.try_get(3)?, + }) + }) + .collect() + }) + } + + fn list_tables(&self, schema_id: i64, snapshot_id: i64) -> Result> { + block_on(async { + let rows = sqlx::query( + "SELECT table_id, table_name, path, path_is_relative FROM ducklake_table + WHERE schema_id = ? + AND ? >= begin_snapshot + AND (? < end_snapshot OR end_snapshot IS NULL)", + ) + .bind(schema_id) + .bind(snapshot_id) + .bind(snapshot_id) + .fetch_all(&self.pool) + .await?; + + rows.into_iter() + .map(|row| { + Ok(TableMetadata { + table_id: row.try_get(0)?, + table_name: row.try_get(1)?, + path: row.try_get(2)?, + path_is_relative: row.try_get(3)?, + }) + }) + .collect() + }) + } + + fn get_table_structure(&self, table_id: i64) -> Result> { + block_on(async { + let rows = sqlx::query( + "SELECT column_id, column_name, column_type, nulls_allowed + FROM ducklake_column + WHERE table_id = ? + ORDER BY column_order", + ) + .bind(table_id) + .fetch_all(&self.pool) + .await?; + + rows.into_iter() + .map(|row| { + let nulls_allowed: Option = row.try_get(3)?; + Ok(DuckLakeTableColumn { + column_id: row.try_get(0)?, + column_name: row.try_get(1)?, + column_type: row.try_get(2)?, + is_nullable: nulls_allowed.unwrap_or(true), + }) + }) + .collect() + }) + } + + fn get_table_files_for_select( + &self, + table_id: i64, + snapshot_id: i64, + ) -> Result> { + block_on(async { + let rows = sqlx::query( + "SELECT + data.data_file_id, + data.path AS data_file_path, + data.path_is_relative AS data_path_is_relative, + data.file_size_bytes AS data_file_size, + data.footer_size AS data_footer_size, + data.encryption_key AS data_encryption_key, + del.delete_file_id, + del.path AS delete_file_path, + del.path_is_relative AS delete_path_is_relative, + del.file_size_bytes AS delete_file_size, + del.footer_size AS delete_footer_size, + del.encryption_key AS delete_encryption_key, + del.delete_count + FROM ducklake_data_file AS data + LEFT JOIN ducklake_delete_file AS del + ON data.data_file_id = del.data_file_id + AND del.table_id = ? + AND ? >= del.begin_snapshot + AND (? < del.end_snapshot OR del.end_snapshot IS NULL) + WHERE data.table_id = ? + AND ? >= data.begin_snapshot + AND (? < data.end_snapshot OR data.end_snapshot IS NULL)", + ) + .bind(table_id) + .bind(snapshot_id) + .bind(snapshot_id) + .bind(table_id) + .bind(snapshot_id) + .bind(snapshot_id) + .fetch_all(&self.pool) + .await?; + + rows.into_iter() + .map(|row| { + let data_file = DuckLakeFileData { + path: row.try_get(1)?, + path_is_relative: row.try_get(2)?, + file_size_bytes: row.try_get(3)?, + footer_size: row.try_get(4)?, + encryption_key: row.try_get(5)?, + }; + + let delete_file = if row.try_get::, _>(6)?.is_some() { + Some(DuckLakeFileData { + path: row.try_get(7)?, + path_is_relative: row.try_get(8)?, + file_size_bytes: row.try_get(9)?, + footer_size: row.try_get(10)?, + encryption_key: row.try_get(11)?, + }) + } else { + None + }; + + Ok(DuckLakeTableFile { + file: data_file, + delete_file, + row_id_start: None, + snapshot_id: None, + max_row_count: None, + }) + }) + .collect() + }) + } + + fn get_schema_by_name(&self, name: &str, snapshot_id: i64) -> Result> { + block_on(async { + let row = sqlx::query( + "SELECT schema_id, schema_name, path, path_is_relative FROM ducklake_schema + WHERE schema_name = ? + AND ? >= begin_snapshot + AND (? < end_snapshot OR end_snapshot IS NULL)", + ) + .bind(name) + .bind(snapshot_id) + .bind(snapshot_id) + .fetch_optional(&self.pool) + .await?; + + match row { + Some(r) => Ok(Some(SchemaMetadata { + schema_id: r.try_get(0)?, + schema_name: r.try_get(1)?, + path: r.try_get(2)?, + path_is_relative: r.try_get(3)?, + })), + None => Ok(None), + } + }) + } + + fn get_table_by_name( + &self, + schema_id: i64, + name: &str, + snapshot_id: i64, + ) -> Result> { + block_on(async { + let row = sqlx::query( + "SELECT table_id, table_name, path, path_is_relative FROM ducklake_table + WHERE schema_id = ? + AND table_name = ? + AND ? >= begin_snapshot + AND (? < end_snapshot OR end_snapshot IS NULL)", + ) + .bind(schema_id) + .bind(name) + .bind(snapshot_id) + .bind(snapshot_id) + .fetch_optional(&self.pool) + .await?; + + match row { + Some(r) => Ok(Some(TableMetadata { + table_id: r.try_get(0)?, + table_name: r.try_get(1)?, + path: r.try_get(2)?, + path_is_relative: r.try_get(3)?, + })), + None => Ok(None), + } + }) + } + + fn table_exists(&self, schema_id: i64, name: &str, snapshot_id: i64) -> Result { + block_on(async { + // MySQL doesn't support SELECT EXISTS(...) the same way PostgreSQL does + // Use COUNT instead + let row = sqlx::query( + "SELECT COUNT(*) FROM ducklake_table + WHERE schema_id = ? + AND table_name = ? + AND ? >= begin_snapshot + AND (? < end_snapshot OR end_snapshot IS NULL)", + ) + .bind(schema_id) + .bind(name) + .bind(snapshot_id) + .bind(snapshot_id) + .fetch_one(&self.pool) + .await?; + + let count: i64 = row.try_get(0)?; + Ok(count > 0) + }) + } + + fn list_all_tables(&self, snapshot_id: i64) -> Result> { + block_on(async { + let rows = sqlx::query( + "SELECT s.schema_name, t.table_id, t.table_name, t.path, t.path_is_relative + FROM ducklake_schema s + JOIN ducklake_table t ON s.schema_id = t.schema_id + WHERE ? >= s.begin_snapshot + AND (? < s.end_snapshot OR s.end_snapshot IS NULL) + AND ? >= t.begin_snapshot + AND (? < t.end_snapshot OR t.end_snapshot IS NULL) + ORDER BY s.schema_name, t.table_name", + ) + .bind(snapshot_id) + .bind(snapshot_id) + .bind(snapshot_id) + .bind(snapshot_id) + .fetch_all(&self.pool) + .await?; + + rows.into_iter() + .map(|row| { + let schema_name: String = row.try_get(0)?; + let table = TableMetadata { + table_id: row.try_get(1)?, + table_name: row.try_get(2)?, + path: row.try_get(3)?, + path_is_relative: row.try_get(4)?, + }; + Ok(TableWithSchema { + schema_name, + table, + }) + }) + .collect() + }) + } + + fn list_all_columns(&self, snapshot_id: i64) -> Result> { + block_on(async { + let rows = sqlx::query( + "SELECT s.schema_name, t.table_name, c.column_id, c.column_name, c.column_type, c.nulls_allowed + FROM ducklake_schema s + JOIN ducklake_table t ON s.schema_id = t.schema_id + JOIN ducklake_column c ON t.table_id = c.table_id + WHERE ? >= s.begin_snapshot + AND (? < s.end_snapshot OR s.end_snapshot IS NULL) + AND ? >= t.begin_snapshot + AND (? < t.end_snapshot OR t.end_snapshot IS NULL) + ORDER BY s.schema_name, t.table_name, c.column_order", + ) + .bind(snapshot_id) + .bind(snapshot_id) + .bind(snapshot_id) + .bind(snapshot_id) + .fetch_all(&self.pool) + .await?; + + rows.into_iter() + .map(|row| { + let schema_name: String = row.try_get(0)?; + let table_name: String = row.try_get(1)?; + let nulls_allowed: Option = row.try_get(5)?; + let column = DuckLakeTableColumn { + column_id: row.try_get(2)?, + column_name: row.try_get(3)?, + column_type: row.try_get(4)?, + is_nullable: nulls_allowed.unwrap_or(true), + }; + Ok(ColumnWithTable { + schema_name, + table_name, + column, + }) + }) + .collect() + }) + } + + fn list_all_files(&self, snapshot_id: i64) -> Result> { + block_on(async { + let rows = sqlx::query( + "SELECT + s.schema_name, + t.table_name, + data.data_file_id, + data.path AS data_file_path, + data.path_is_relative AS data_path_is_relative, + data.file_size_bytes AS data_file_size, + data.footer_size AS data_footer_size, + data.encryption_key AS data_encryption_key, + del.delete_file_id, + del.path AS delete_file_path, + del.path_is_relative AS delete_path_is_relative, + del.file_size_bytes AS delete_file_size, + del.footer_size AS delete_footer_size, + del.encryption_key AS delete_encryption_key, + del.delete_count + FROM ducklake_schema s + JOIN ducklake_table t ON s.schema_id = t.schema_id + JOIN ducklake_data_file data ON t.table_id = data.table_id + LEFT JOIN ducklake_delete_file del + ON data.data_file_id = del.data_file_id + AND del.table_id = t.table_id + AND ? >= del.begin_snapshot + AND (? < del.end_snapshot OR del.end_snapshot IS NULL) + WHERE ? >= s.begin_snapshot + AND (? < s.end_snapshot OR s.end_snapshot IS NULL) + AND ? >= t.begin_snapshot + AND (? < t.end_snapshot OR t.end_snapshot IS NULL) + AND ? >= data.begin_snapshot + AND (? < data.end_snapshot OR data.end_snapshot IS NULL) + ORDER BY s.schema_name, t.table_name, data.path", + ) + .bind(snapshot_id) + .bind(snapshot_id) + .bind(snapshot_id) + .bind(snapshot_id) + .bind(snapshot_id) + .bind(snapshot_id) + .bind(snapshot_id) + .bind(snapshot_id) + .fetch_all(&self.pool) + .await?; + + rows.into_iter() + .map(|row| { + let data_file = DuckLakeFileData { + path: row.try_get(3)?, + path_is_relative: row.try_get(4)?, + file_size_bytes: row.try_get(5)?, + footer_size: row.try_get(6)?, + encryption_key: row.try_get(7)?, + }; + + let delete_file = if row.try_get::, _>(8)?.is_some() { + Some(DuckLakeFileData { + path: row.try_get(9)?, + path_is_relative: row.try_get(10)?, + file_size_bytes: row.try_get(11)?, + footer_size: row.try_get(12)?, + encryption_key: row.try_get(13)?, + }) + } else { + None + }; + + Ok(FileWithTable { + schema_name: row.try_get(0)?, + table_name: row.try_get(1)?, + file: DuckLakeTableFile { + file: data_file, + delete_file, + row_id_start: None, + snapshot_id: None, + max_row_count: row.try_get(14)?, + }, + }) + }) + .collect() + }) + } + + fn get_data_files_added_between_snapshots( + &self, + table_id: i64, + start_snapshot: i64, + end_snapshot: i64, + ) -> Result> { + block_on(async { + let rows = sqlx::query( + "SELECT + data.begin_snapshot, + data.path, + data.path_is_relative, + data.file_size_bytes, + data.footer_size, + data.encryption_key + FROM ducklake_data_file AS data + WHERE data.table_id = ? + AND data.begin_snapshot > ? + AND data.begin_snapshot <= ? + ORDER BY data.begin_snapshot", + ) + .bind(table_id) + .bind(start_snapshot) + .bind(end_snapshot) + .fetch_all(&self.pool) + .await?; + + rows.into_iter() + .map(|row| { + Ok(DataFileChange { + begin_snapshot: row.try_get(0)?, + path: row.try_get(1)?, + path_is_relative: row.try_get(2)?, + file_size_bytes: row.try_get(3)?, + footer_size: row.try_get(4)?, + encryption_key: row.try_get(5)?, + }) + }) + .collect() + }) + } + + fn get_delete_files_added_between_snapshots( + &self, + table_id: i64, + start_snapshot: i64, + end_snapshot: i64, + ) -> Result> { + block_on(async { + let rows = sqlx::query( + "SELECT del.begin_snapshot + FROM ducklake_delete_file AS del + WHERE del.table_id = ? + AND del.begin_snapshot > ? + AND del.begin_snapshot <= ? + ORDER BY del.begin_snapshot", + ) + .bind(table_id) + .bind(start_snapshot) + .bind(end_snapshot) + .fetch_all(&self.pool) + .await?; + + rows.into_iter() + .map(|row| { + Ok(DeleteFileChange { + begin_snapshot: row.try_get(0)?, + }) + }) + .collect() + }) + } +} diff --git a/src/metadata_provider_postgres.rs b/src/metadata_provider_postgres.rs index eb1fa8a..22d250c 100644 --- a/src/metadata_provider_postgres.rs +++ b/src/metadata_provider_postgres.rs @@ -78,9 +78,8 @@ impl MetadataProvider for PostgresMetadataProvider { fn get_data_path(&self) -> Result { block_on(async { let row = - sqlx::query("SELECT value FROM ducklake_metadata WHERE key = $1 AND scope = $2") + sqlx::query("SELECT value FROM ducklake_metadata WHERE key = $1 AND scope IS NULL") .bind("data_path") - .bind("") .fetch_optional(&self.pool) .await?; diff --git a/tests/mysql_metadata_provider_test.rs b/tests/mysql_metadata_provider_test.rs new file mode 100644 index 0000000..f0e43b9 --- /dev/null +++ b/tests/mysql_metadata_provider_test.rs @@ -0,0 +1,1022 @@ +#![cfg(feature = "metadata-mysql")] +//! MySQL metadata provider tests +//! +//! This test suite verifies the MySQL metadata provider implementation, +//! including all MetadataProvider trait methods, schema initialization, +//! concurrent access, and error handling. +//! +//! ## Test Setup +//! +//! Tests use testcontainers to spin up a temporary MySQL instance. +//! Each test creates its own database with test data to ensure isolation. +//! +//! ## Coverage +//! +//! - Schema initialization (idempotent) +//! - All MetadataProvider trait methods +//! - Snapshot isolation and temporal queries +//! - Concurrent access and thread safety +//! - Error handling and edge cases + +mod common; + +use datafusion::prelude::*; +use datafusion_ducklake::{ + DuckLakeCatalog, DuckdbMetadataProvider, MySqlMetadataProvider, + metadata_provider::MetadataProvider, +}; +use sqlx::MySqlPool; +use std::sync::Arc; +use tempfile::TempDir; +use testcontainers::runners::AsyncRunner; +use testcontainers_modules::mysql::Mysql; + +/// Initialize DuckLake catalog schema in MySQL (for tests only) +async fn init_schema(pool: &MySqlPool) -> anyhow::Result<()> { + sqlx::query( + "CREATE TABLE IF NOT EXISTS ducklake_snapshot ( + snapshot_id BIGINT PRIMARY KEY, + snapshot_time DATETIME(6) + )", + ) + .execute(pool) + .await?; + + sqlx::query( + "CREATE TABLE IF NOT EXISTS ducklake_schema ( + schema_id BIGINT PRIMARY KEY, + schema_name VARCHAR(255) NOT NULL, + path VARCHAR(1024) NOT NULL, + path_is_relative BOOLEAN NOT NULL, + begin_snapshot BIGINT NOT NULL, + end_snapshot BIGINT + )", + ) + .execute(pool) + .await?; + + sqlx::query( + "CREATE TABLE IF NOT EXISTS ducklake_table ( + table_id BIGINT PRIMARY KEY, + schema_id BIGINT NOT NULL, + table_name VARCHAR(255) NOT NULL, + path VARCHAR(1024) NOT NULL, + path_is_relative BOOLEAN NOT NULL, + begin_snapshot BIGINT NOT NULL, + end_snapshot BIGINT, + FOREIGN KEY (schema_id) REFERENCES ducklake_schema(schema_id) + )", + ) + .execute(pool) + .await?; + + sqlx::query( + "CREATE TABLE IF NOT EXISTS ducklake_column ( + column_id BIGINT PRIMARY KEY, + table_id BIGINT NOT NULL, + column_name VARCHAR(255) NOT NULL, + column_type VARCHAR(255) NOT NULL, + column_order INTEGER NOT NULL, + nulls_allowed BOOLEAN, + FOREIGN KEY (table_id) REFERENCES ducklake_table(table_id) + )", + ) + .execute(pool) + .await?; + + sqlx::query( + "CREATE TABLE IF NOT EXISTS ducklake_data_file ( + data_file_id BIGINT PRIMARY KEY, + table_id BIGINT NOT NULL, + path VARCHAR(1024) NOT NULL, + path_is_relative BOOLEAN NOT NULL, + file_size_bytes BIGINT NOT NULL, + footer_size BIGINT, + encryption_key VARCHAR(255), + begin_snapshot BIGINT NOT NULL DEFAULT 1, + end_snapshot BIGINT, + FOREIGN KEY (table_id) REFERENCES ducklake_table(table_id) + )", + ) + .execute(pool) + .await?; + + sqlx::query( + "CREATE TABLE IF NOT EXISTS ducklake_delete_file ( + delete_file_id BIGINT PRIMARY KEY, + data_file_id BIGINT NOT NULL, + table_id BIGINT NOT NULL, + path VARCHAR(1024) NOT NULL, + path_is_relative BOOLEAN NOT NULL, + file_size_bytes BIGINT NOT NULL, + footer_size BIGINT, + encryption_key VARCHAR(255), + delete_count BIGINT, + begin_snapshot BIGINT NOT NULL, + end_snapshot BIGINT, + FOREIGN KEY (data_file_id) REFERENCES ducklake_data_file(data_file_id), + FOREIGN KEY (table_id) REFERENCES ducklake_table(table_id) + )", + ) + .execute(pool) + .await?; + + sqlx::query( + "CREATE TABLE IF NOT EXISTS ducklake_metadata ( + `key` VARCHAR(255) NOT NULL, + value VARCHAR(1024) NOT NULL, + scope VARCHAR(255), + scope_id BIGINT, + PRIMARY KEY (`key`) + )", + ) + .execute(pool) + .await?; + + // MySQL indexes + sqlx::query( + "CREATE INDEX IF NOT EXISTS idx_schema_snapshot ON ducklake_schema(begin_snapshot, end_snapshot)", + ) + .execute(pool) + .await + .ok(); // Ignore if already exists + + sqlx::query("CREATE INDEX IF NOT EXISTS idx_table_schema ON ducklake_table(schema_id)") + .execute(pool) + .await + .ok(); + + sqlx::query( + "CREATE INDEX IF NOT EXISTS idx_table_snapshot ON ducklake_table(begin_snapshot, end_snapshot)", + ) + .execute(pool) + .await + .ok(); + + Ok(()) +} + +/// Helper to create a MySQL provider with initialized schema +async fn create_mysql_provider() +-> anyhow::Result<(MySqlMetadataProvider, testcontainers::ContainerAsync)> { + let container = Mysql::default().start().await?; + + let host = "127.0.0.1"; + let port = container.get_host_port_ipv4(3306).await?; + let conn_str = format!("mysql://root@{}:{}/test", host, port); + + let provider = MySqlMetadataProvider::new(&conn_str) + .await + .expect("Failed to create provider"); + init_schema(&provider.pool).await?; + + Ok((provider, container)) +} + +/// Helper to populate test data in MySQL +async fn populate_test_data(provider: &MySqlMetadataProvider) -> anyhow::Result<()> { + let pool = &provider.pool; + + // Insert snapshots + sqlx::query("INSERT INTO ducklake_snapshot (snapshot_id, snapshot_time) VALUES (?, NOW())") + .bind(1i64) + .execute(pool) + .await?; + + sqlx::query("INSERT INTO ducklake_snapshot (snapshot_id, snapshot_time) VALUES (?, NOW())") + .bind(2i64) + .execute(pool) + .await?; + + // Insert metadata (data_path) + sqlx::query( + "INSERT INTO ducklake_metadata (`key`, value, scope, scope_id) VALUES (?, ?, NULL, NULL)", + ) + .bind("data_path") + .bind("file:///tmp/ducklake_data/") + .execute(pool) + .await?; + + // Insert schema + sqlx::query( + "INSERT INTO ducklake_schema (schema_id, schema_name, path, path_is_relative, begin_snapshot, end_snapshot) + VALUES (?, ?, ?, ?, ?, ?)" + ) + .bind(1i64) + .bind("test_schema") + .bind("test_schema/") + .bind(true) + .bind(1i64) + .bind(None::) + .execute(pool) + .await?; + + // Insert another schema (only in snapshot 2) + sqlx::query( + "INSERT INTO ducklake_schema (schema_id, schema_name, path, path_is_relative, begin_snapshot, end_snapshot) + VALUES (?, ?, ?, ?, ?, ?)" + ) + .bind(2i64) + .bind("schema2") + .bind("schema2/") + .bind(true) + .bind(2i64) + .bind(None::) + .execute(pool) + .await?; + + // Insert table + sqlx::query( + "INSERT INTO ducklake_table (table_id, schema_id, table_name, path, path_is_relative, begin_snapshot, end_snapshot) + VALUES (?, ?, ?, ?, ?, ?, ?)" + ) + .bind(1i64) + .bind(1i64) + .bind("users") + .bind("users/") + .bind(true) + .bind(1i64) + .bind(None::) + .execute(pool) + .await?; + + // Insert another table (only in snapshot 2) + sqlx::query( + "INSERT INTO ducklake_table (table_id, schema_id, table_name, path, path_is_relative, begin_snapshot, end_snapshot) + VALUES (?, ?, ?, ?, ?, ?, ?)" + ) + .bind(2i64) + .bind(1i64) + .bind("products") + .bind("products/") + .bind(true) + .bind(2i64) + .bind(None::) + .execute(pool) + .await?; + + // Insert columns for users table + sqlx::query( + "INSERT INTO ducklake_column (column_id, table_id, column_name, column_type, column_order, nulls_allowed) + VALUES (?, ?, ?, ?, ?, ?)", + ) + .bind(1i64) + .bind(1i64) + .bind("id") + .bind("INT") + .bind(0i32) + .bind(false) + .execute(pool) + .await?; + + sqlx::query( + "INSERT INTO ducklake_column (column_id, table_id, column_name, column_type, column_order, nulls_allowed) + VALUES (?, ?, ?, ?, ?, ?)", + ) + .bind(2i64) + .bind(1i64) + .bind("name") + .bind("VARCHAR") + .bind(1i32) + .bind(true) + .execute(pool) + .await?; + + sqlx::query( + "INSERT INTO ducklake_column (column_id, table_id, column_name, column_type, column_order, nulls_allowed) + VALUES (?, ?, ?, ?, ?, ?)", + ) + .bind(3i64) + .bind(1i64) + .bind("email") + .bind("VARCHAR") + .bind(2i32) + .bind(true) + .execute(pool) + .await?; + + // Insert data file + sqlx::query( + "INSERT INTO ducklake_data_file (data_file_id, table_id, path, path_is_relative, file_size_bytes, footer_size, begin_snapshot) + VALUES (?, ?, ?, ?, ?, ?, ?)" + ) + .bind(1i64) + .bind(1i64) + .bind("data_001.parquet") + .bind(true) + .bind(1024i64) + .bind(Some(128i64)) + .bind(1i64) + .execute(pool) + .await?; + + sqlx::query( + "INSERT INTO ducklake_data_file (data_file_id, table_id, path, path_is_relative, file_size_bytes, footer_size, begin_snapshot) + VALUES (?, ?, ?, ?, ?, ?, ?)" + ) + .bind(2i64) + .bind(1i64) + .bind("data_002.parquet") + .bind(true) + .bind(2048i64) + .bind(Some(256i64)) + .bind(1i64) + .execute(pool) + .await?; + + // Insert delete file for first data file + sqlx::query( + "INSERT INTO ducklake_delete_file (delete_file_id, data_file_id, table_id, path, path_is_relative, + file_size_bytes, footer_size, delete_count, begin_snapshot, end_snapshot) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" + ) + .bind(1i64) + .bind(1i64) + .bind(1i64) + .bind("data_001.delete.parquet") + .bind(true) + .bind(512i64) + .bind(Some(64i64)) + .bind(Some(5i64)) + .bind(1i64) + .bind(None::) + .execute(pool) + .await?; + + Ok(()) +} + +/// Helper to populate MySQL with metadata from a DuckDB-created catalog +async fn populate_from_duckdb_catalog( + provider: &MySqlMetadataProvider, +) -> anyhow::Result<(String, TempDir)> { + // Step 1: Create temporary directory and DuckDB catalog with real Parquet files + let temp_dir = TempDir::new()?; + let catalog_path = temp_dir.path().join("source.ducklake"); + common::create_catalog_no_deletes(&catalog_path)?; + + // Step 2: Read metadata from DuckDB catalog + let duckdb_provider = DuckdbMetadataProvider::new(catalog_path.to_string_lossy().to_string())?; + + let data_path = duckdb_provider.get_data_path()?; + let snapshots = duckdb_provider.list_snapshots()?; + let current_snapshot = snapshots + .last() + .ok_or_else(|| anyhow::anyhow!("No snapshots found"))?; + + let schemas = duckdb_provider.list_schemas(current_snapshot.snapshot_id)?; + + // Step 3: Populate MySQL with metadata from DuckDB + let pool = &provider.pool; + + // Insert snapshots + for snapshot in &snapshots { + let timestamp_value: Option = + snapshot.timestamp.as_ref().and_then(|ts_str| { + sqlx::types::chrono::NaiveDateTime::parse_from_str(ts_str, "%Y-%m-%d %H:%M:%S%.6f") + .ok() + }); + + sqlx::query("INSERT INTO ducklake_snapshot (snapshot_id, snapshot_time) VALUES (?, ?)") + .bind(snapshot.snapshot_id) + .bind(timestamp_value) + .execute(pool) + .await?; + } + + // Insert data_path metadata + sqlx::query( + "INSERT INTO ducklake_metadata (`key`, value, scope, scope_id) VALUES (?, ?, NULL, NULL)", + ) + .bind("data_path") + .bind(&data_path) + .execute(pool) + .await?; + + // Insert schemas, tables, columns, and files + for schema in &schemas { + sqlx::query( + "INSERT INTO ducklake_schema (schema_id, schema_name, path, path_is_relative, begin_snapshot, end_snapshot) + VALUES (?, ?, ?, ?, ?, ?)" + ) + .bind(schema.schema_id) + .bind(&schema.schema_name) + .bind(&schema.path) + .bind(schema.path_is_relative) + .bind(1i64) + .bind(None::) + .execute(pool) + .await?; + + let tables = duckdb_provider.list_tables(schema.schema_id, current_snapshot.snapshot_id)?; + + for table in &tables { + sqlx::query( + "INSERT INTO ducklake_table (table_id, schema_id, table_name, path, path_is_relative, begin_snapshot, end_snapshot) + VALUES (?, ?, ?, ?, ?, ?, ?)" + ) + .bind(table.table_id) + .bind(schema.schema_id) + .bind(&table.table_name) + .bind(&table.path) + .bind(table.path_is_relative) + .bind(1i64) + .bind(None::) + .execute(pool) + .await?; + + let columns = duckdb_provider.get_table_structure(table.table_id)?; + + for (order, column) in columns.iter().enumerate() { + sqlx::query( + "INSERT INTO ducklake_column (column_id, table_id, column_name, column_type, column_order, nulls_allowed) + VALUES (?, ?, ?, ?, ?, ?)" + ) + .bind(column.column_id) + .bind(table.table_id) + .bind(&column.column_name) + .bind(&column.column_type) + .bind(order as i32) + .bind(column.is_nullable) + .execute(pool) + .await?; + } + + let files = duckdb_provider + .get_table_files_for_select(table.table_id, current_snapshot.snapshot_id)?; + + for (file_idx, file) in files.iter().enumerate() { + let data_file_id = table.table_id * 1000 + file_idx as i64 + 1; + + sqlx::query( + "INSERT INTO ducklake_data_file (data_file_id, table_id, path, path_is_relative, file_size_bytes, footer_size, begin_snapshot) + VALUES (?, ?, ?, ?, ?, ?, ?)" + ) + .bind(data_file_id) + .bind(table.table_id) + .bind(&file.file.path) + .bind(file.file.path_is_relative) + .bind(file.file.file_size_bytes) + .bind(file.file.footer_size) + .bind(1i64) + .execute(pool) + .await?; + + if let Some(delete_file) = &file.delete_file { + let delete_file_id = data_file_id; + + sqlx::query( + "INSERT INTO ducklake_delete_file (delete_file_id, data_file_id, table_id, path, path_is_relative, + file_size_bytes, footer_size, delete_count, begin_snapshot, end_snapshot) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" + ) + .bind(delete_file_id) + .bind(data_file_id) + .bind(table.table_id) + .bind(&delete_file.path) + .bind(delete_file.path_is_relative) + .bind(delete_file.file_size_bytes) + .bind(delete_file.footer_size) + .bind(None::) + .bind(1i64) + .bind(None::) + .execute(pool) + .await?; + } + } + } + } + + Ok((data_path, temp_dir)) +} + +#[tokio::test(flavor = "multi_thread")] +#[cfg_attr(all(feature = "skip-tests-with-docker", target_os = "macos"), ignore)] +async fn test_schema_initialization_idempotent() { + let (provider, _container) = create_mysql_provider().await.unwrap(); + + // Initialize schema again - should be idempotent + init_schema(&provider.pool) + .await + .expect("Schema initialization should be idempotent"); + + // Verify tables exist by querying them + let result = provider.get_current_snapshot(); + assert!(result.is_ok(), "Should be able to query after init"); +} + +#[tokio::test(flavor = "multi_thread")] +#[cfg_attr(all(feature = "skip-tests-with-docker", target_os = "macos"), ignore)] +async fn test_get_current_snapshot() { + let (provider, _container) = create_mysql_provider().await.unwrap(); + + // Initially should be 0 (no snapshots) + let snapshot_id = provider + .get_current_snapshot() + .expect("Should get current snapshot"); + assert_eq!(snapshot_id, 0, "Should be 0 when no snapshots exist"); + + // Populate test data + populate_test_data(&provider) + .await + .expect("Failed to populate test data"); + + // Should now return 2 (max snapshot_id) + let snapshot_id = provider + .get_current_snapshot() + .expect("Should get current snapshot"); + assert_eq!(snapshot_id, 2, "Should return max snapshot_id"); +} + +#[tokio::test(flavor = "multi_thread")] +#[cfg_attr(all(feature = "skip-tests-with-docker", target_os = "macos"), ignore)] +async fn test_get_data_path() { + let (provider, _container) = create_mysql_provider().await.unwrap(); + + populate_test_data(&provider) + .await + .expect("Failed to populate test data"); + + let data_path = provider.get_data_path().expect("Should get data path"); + + assert_eq!(data_path, "file:///tmp/ducklake_data/"); +} + +#[tokio::test(flavor = "multi_thread")] +#[cfg_attr(all(feature = "skip-tests-with-docker", target_os = "macos"), ignore)] +async fn test_list_snapshots() { + let (provider, _container) = create_mysql_provider().await.unwrap(); + + populate_test_data(&provider) + .await + .expect("Failed to populate test data"); + + let snapshots = provider.list_snapshots().expect("Should list snapshots"); + + assert_eq!(snapshots.len(), 2, "Should have 2 snapshots"); + assert_eq!(snapshots[0].snapshot_id, 1); + assert_eq!(snapshots[1].snapshot_id, 2); +} + +#[tokio::test(flavor = "multi_thread")] +#[cfg_attr(all(feature = "skip-tests-with-docker", target_os = "macos"), ignore)] +async fn test_list_schemas_snapshot_isolation() { + let (provider, _container) = create_mysql_provider().await.unwrap(); + + populate_test_data(&provider) + .await + .expect("Failed to populate test data"); + + // Snapshot 1 should only see test_schema + let schemas = provider + .list_schemas(1) + .expect("Should list schemas for snapshot 1"); + + assert_eq!(schemas.len(), 1, "Snapshot 1 should have 1 schema"); + assert_eq!(schemas[0].schema_name, "test_schema"); + + // Snapshot 2 should see both schemas + let schemas = provider + .list_schemas(2) + .expect("Should list schemas for snapshot 2"); + + assert_eq!(schemas.len(), 2, "Snapshot 2 should have 2 schemas"); + + let schema_names: Vec<_> = schemas.iter().map(|s| s.schema_name.as_str()).collect(); + assert!(schema_names.contains(&"test_schema")); + assert!(schema_names.contains(&"schema2")); +} + +#[tokio::test(flavor = "multi_thread")] +#[cfg_attr(all(feature = "skip-tests-with-docker", target_os = "macos"), ignore)] +async fn test_get_schema_by_name() { + let (provider, _container) = create_mysql_provider().await.unwrap(); + + populate_test_data(&provider) + .await + .expect("Failed to populate test data"); + + // Should find test_schema + let schema = provider + .get_schema_by_name("test_schema", 1) + .expect("Should get schema by name"); + + assert!(schema.is_some(), "Should find test_schema"); + let schema = schema.unwrap(); + assert_eq!(schema.schema_name, "test_schema"); + assert_eq!(schema.schema_id, 1); + + // Should not find non-existent schema + let schema = provider + .get_schema_by_name("nonexistent", 1) + .expect("Should handle non-existent schema"); + + assert!(schema.is_none(), "Should not find nonexistent schema"); + + // schema2 should not be visible in snapshot 1 + let schema = provider + .get_schema_by_name("schema2", 1) + .expect("Should handle schema not in snapshot"); + + assert!( + schema.is_none(), + "schema2 should not be visible in snapshot 1" + ); + + // schema2 should be visible in snapshot 2 + let schema = provider + .get_schema_by_name("schema2", 2) + .expect("Should get schema by name"); + + assert!(schema.is_some(), "schema2 should be visible in snapshot 2"); +} + +#[tokio::test(flavor = "multi_thread")] +#[cfg_attr(all(feature = "skip-tests-with-docker", target_os = "macos"), ignore)] +async fn test_list_tables() { + let (provider, _container) = create_mysql_provider().await.unwrap(); + + populate_test_data(&provider) + .await + .expect("Failed to populate test data"); + + // Snapshot 1 should only see users table + let tables = provider.list_tables(1, 1).expect("Should list tables"); + + assert_eq!(tables.len(), 1, "Snapshot 1 should have 1 table"); + assert_eq!(tables[0].table_name, "users"); + + // Snapshot 2 should see both tables + let tables = provider.list_tables(1, 2).expect("Should list tables"); + + assert_eq!(tables.len(), 2, "Snapshot 2 should have 2 tables"); + + let table_names: Vec<_> = tables.iter().map(|t| t.table_name.as_str()).collect(); + assert!(table_names.contains(&"users")); + assert!(table_names.contains(&"products")); +} + +#[tokio::test(flavor = "multi_thread")] +#[cfg_attr(all(feature = "skip-tests-with-docker", target_os = "macos"), ignore)] +async fn test_get_table_by_name() { + let (provider, _container) = create_mysql_provider().await.unwrap(); + + populate_test_data(&provider) + .await + .expect("Failed to populate test data"); + + // Should find users table + let table = provider + .get_table_by_name(1, "users", 1) + .expect("Should get table by name"); + + assert!(table.is_some(), "Should find users table"); + let table = table.unwrap(); + assert_eq!(table.table_name, "users"); + assert_eq!(table.table_id, 1); + + // Should not find non-existent table + let table = provider + .get_table_by_name(1, "nonexistent", 1) + .expect("Should handle non-existent table"); + + assert!(table.is_none(), "Should not find nonexistent table"); +} + +#[tokio::test(flavor = "multi_thread")] +#[cfg_attr(all(feature = "skip-tests-with-docker", target_os = "macos"), ignore)] +async fn test_table_exists() { + let (provider, _container) = create_mysql_provider().await.unwrap(); + + populate_test_data(&provider) + .await + .expect("Failed to populate test data"); + + // users table should exist + let exists = provider + .table_exists(1, "users", 1) + .expect("Should check if table exists"); + + assert!(exists, "users table should exist"); + + // nonexistent table should not exist + let exists = provider + .table_exists(1, "nonexistent", 1) + .expect("Should check if table exists"); + + assert!(!exists, "nonexistent table should not exist"); + + // products table should not exist in snapshot 1 + let exists = provider + .table_exists(1, "products", 1) + .expect("Should check if table exists"); + + assert!(!exists, "products table should not exist in snapshot 1"); + + // products table should exist in snapshot 2 + let exists = provider + .table_exists(1, "products", 2) + .expect("Should check if table exists"); + + assert!(exists, "products table should exist in snapshot 2"); +} + +#[tokio::test(flavor = "multi_thread")] +#[cfg_attr(all(feature = "skip-tests-with-docker", target_os = "macos"), ignore)] +async fn test_get_table_structure() { + let (provider, _container) = create_mysql_provider().await.unwrap(); + + populate_test_data(&provider) + .await + .expect("Failed to populate test data"); + + let columns = provider + .get_table_structure(1) + .expect("Should get table structure"); + + assert_eq!(columns.len(), 3, "users table should have 3 columns"); + + assert_eq!(columns[0].column_name, "id"); + assert_eq!(columns[0].column_type, "INT"); + + assert_eq!(columns[1].column_name, "name"); + assert_eq!(columns[1].column_type, "VARCHAR"); + + assert_eq!(columns[2].column_name, "email"); + assert_eq!(columns[2].column_type, "VARCHAR"); +} + +#[tokio::test(flavor = "multi_thread")] +#[cfg_attr(all(feature = "skip-tests-with-docker", target_os = "macos"), ignore)] +async fn test_get_table_files_for_select() { + let (provider, _container) = create_mysql_provider().await.unwrap(); + + populate_test_data(&provider) + .await + .expect("Failed to populate test data"); + + let files = provider + .get_table_files_for_select(1, 1) + .expect("Should get table files"); + + assert_eq!(files.len(), 2, "Should have 2 data files"); + + // First file should have a delete file + assert_eq!(files[0].file.path, "data_001.parquet"); + assert_eq!(files[0].file.file_size_bytes, 1024); + assert_eq!(files[0].file.footer_size, Some(128)); + assert!( + files[0].delete_file.is_some(), + "First file should have delete file" + ); + + let delete_file = files[0].delete_file.as_ref().unwrap(); + assert_eq!(delete_file.path, "data_001.delete.parquet"); + assert_eq!(delete_file.file_size_bytes, 512); + + // Second file should not have a delete file + assert_eq!(files[1].file.path, "data_002.parquet"); + assert_eq!(files[1].file.file_size_bytes, 2048); + assert_eq!(files[1].file.footer_size, Some(256)); + assert!( + files[1].delete_file.is_none(), + "Second file should not have delete file" + ); +} + +#[tokio::test(flavor = "multi_thread")] +#[cfg_attr(all(feature = "skip-tests-with-docker", target_os = "macos"), ignore)] +async fn test_list_all_tables() { + let (provider, _container) = create_mysql_provider().await.unwrap(); + + populate_test_data(&provider) + .await + .expect("Failed to populate test data"); + + // Snapshot 1 should only see 1 table + let tables = provider.list_all_tables(1).expect("Should list all tables"); + + assert_eq!(tables.len(), 1, "Snapshot 1 should have 1 table"); + assert_eq!(tables[0].schema_name, "test_schema"); + assert_eq!(tables[0].table.table_name, "users"); + + // Snapshot 2 should see 2 tables + let tables = provider.list_all_tables(2).expect("Should list all tables"); + + assert_eq!(tables.len(), 2, "Snapshot 2 should have 2 tables"); +} + +#[tokio::test(flavor = "multi_thread")] +#[cfg_attr(all(feature = "skip-tests-with-docker", target_os = "macos"), ignore)] +async fn test_list_all_columns() { + let (provider, _container) = create_mysql_provider().await.unwrap(); + + populate_test_data(&provider) + .await + .expect("Failed to populate test data"); + + let columns = provider + .list_all_columns(1) + .expect("Should list all columns"); + + assert_eq!(columns.len(), 3, "Should have 3 columns from users table"); + + assert_eq!(columns[0].schema_name, "test_schema"); + assert_eq!(columns[0].table_name, "users"); + assert_eq!(columns[0].column.column_name, "id"); + + assert_eq!(columns[1].column.column_name, "name"); + assert_eq!(columns[2].column.column_name, "email"); +} + +#[tokio::test(flavor = "multi_thread")] +#[cfg_attr(all(feature = "skip-tests-with-docker", target_os = "macos"), ignore)] +async fn test_list_all_files() { + let (provider, _container) = create_mysql_provider().await.unwrap(); + + populate_test_data(&provider) + .await + .expect("Failed to populate test data"); + + let files = provider.list_all_files(1).expect("Should list all files"); + + assert_eq!(files.len(), 2, "Should have 2 files"); + + assert_eq!(files[0].schema_name, "test_schema"); + assert_eq!(files[0].table_name, "users"); + assert_eq!(files[0].file.file.path, "data_001.parquet"); + assert!(files[0].file.delete_file.is_some()); + + assert_eq!(files[1].file.file.path, "data_002.parquet"); + assert!(files[1].file.delete_file.is_none()); +} + +#[tokio::test(flavor = "multi_thread")] +#[cfg_attr(all(feature = "skip-tests-with-docker", target_os = "macos"), ignore)] +async fn test_concurrent_access() { + let (provider, _container) = create_mysql_provider().await.unwrap(); + + populate_test_data(&provider) + .await + .expect("Failed to populate test data"); + + let provider = Arc::new(provider); + + // Spawn 10 concurrent tasks + let mut tasks = Vec::new(); + for _ in 0..10 { + let provider = provider.clone(); + let task = tokio::spawn(async move { + let _snapshot = provider + .get_current_snapshot() + .expect("Should get snapshot"); + let _schemas = provider.list_schemas(1).expect("Should list schemas"); + let _tables = provider.list_tables(1, 1).expect("Should list tables"); + let _columns = provider + .get_table_structure(1) + .expect("Should get structure"); + }); + tasks.push(task); + } + + for task in tasks { + task.await.expect("Task should complete successfully"); + } +} + +#[tokio::test(flavor = "multi_thread")] +#[cfg_attr(all(feature = "skip-tests-with-docker", target_os = "macos"), ignore)] +async fn test_datafusion_integration() { + let (provider, _container) = create_mysql_provider().await.unwrap(); + + populate_test_data(&provider) + .await + .expect("Failed to populate test data"); + + let catalog = DuckLakeCatalog::new(provider).expect("Should create catalog"); + + let ctx = SessionContext::new(); + ctx.register_catalog("ducklake", Arc::new(catalog)); + + // Query information_schema + let df = ctx + .sql("SELECT schema_name FROM ducklake.information_schema.schemata") + .await + .expect("Should query information_schema"); + + let results = df.collect().await.expect("Should collect results"); + assert!(!results.is_empty(), "Should have schema results"); +} + +#[tokio::test(flavor = "multi_thread")] +#[cfg_attr(all(feature = "skip-tests-with-docker", target_os = "macos"), ignore)] +async fn test_error_invalid_connection_string() { + let result = MySqlMetadataProvider::new("invalid://connection:string").await; + assert!( + result.is_err(), + "Should fail with invalid connection string" + ); +} + +#[tokio::test(flavor = "multi_thread")] +#[cfg_attr(all(feature = "skip-tests-with-docker", target_os = "macos"), ignore)] +async fn test_error_connection_refused() { + let result = MySqlMetadataProvider::new("mysql://root@localhost:9999/db").await; + assert!(result.is_err(), "Should fail when connection is refused"); +} + +#[tokio::test(flavor = "multi_thread")] +#[cfg_attr(all(feature = "skip-tests-with-docker", target_os = "macos"), ignore)] +async fn test_query_real_parquet_files() { + let (provider, _container) = create_mysql_provider().await.unwrap(); + + let (_data_path, _temp_dir) = populate_from_duckdb_catalog(&provider) + .await + .expect("Failed to populate from DuckDB catalog"); + + let catalog = DuckLakeCatalog::new(provider).expect("Should create catalog"); + + let ctx = SessionContext::new(); + ctx.register_catalog("ducklake", Arc::new(catalog)); + + // Query actual table data + let df = ctx + .sql("SELECT * FROM ducklake.main.users ORDER BY id") + .await + .expect("Should query table data"); + + let results = df.collect().await.expect("Should collect results"); + + assert_eq!(results.len(), 1, "Should have one batch"); + let batch = &results[0]; + assert_eq!(batch.num_rows(), 4, "Should have 4 rows"); + + // Verify schema + assert_eq!(batch.num_columns(), 3, "Should have 3 columns"); + let schema = batch.schema(); + assert_eq!(schema.field(0).name(), "id"); + assert_eq!(schema.field(1).name(), "name"); + assert_eq!(schema.field(2).name(), "email"); + + // Verify first row data + use datafusion::arrow::array::{Int32Array, StringArray}; + let id_col = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let name_col = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + let email_col = batch + .column(2) + .as_any() + .downcast_ref::() + .unwrap(); + + assert_eq!(id_col.value(0), 1); + assert_eq!(name_col.value(0), "Alice"); + assert_eq!(email_col.value(0), "alice@example.com"); + + assert_eq!(id_col.value(1), 2); + assert_eq!(name_col.value(1), "Bob"); + assert_eq!(email_col.value(1), "bob@example.com"); +} + +#[tokio::test(flavor = "multi_thread")] +#[cfg_attr(all(feature = "skip-tests-with-docker", target_os = "macos"), ignore)] +async fn test_query_with_filter() { + let (provider, _container) = create_mysql_provider().await.unwrap(); + + let (_data_path, _temp_dir) = populate_from_duckdb_catalog(&provider) + .await + .expect("Failed to populate from DuckDB catalog"); + + let catalog = DuckLakeCatalog::new(provider).expect("Should create catalog"); + let ctx = SessionContext::new(); + ctx.register_catalog("ducklake", Arc::new(catalog)); + + // Query with WHERE filter + let df = ctx + .sql("SELECT name, email FROM ducklake.main.users WHERE id > 2 ORDER BY id") + .await + .expect("Should query with filter"); + + let results = df.collect().await.expect("Should collect results"); + + assert_eq!(results.len(), 1, "Should have one batch"); + let batch = &results[0]; + assert_eq!(batch.num_rows(), 2, "Should have 2 rows (Charlie, Diana)"); + + use datafusion::arrow::array::StringArray; + let name_col = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + + assert_eq!(name_col.value(0), "Charlie"); + assert_eq!(name_col.value(1), "Diana"); +} diff --git a/tests/postgres_metadata_provider_test.rs b/tests/postgres_metadata_provider_test.rs index 5ed3216..9e0d5cd 100644 --- a/tests/postgres_metadata_provider_test.rs +++ b/tests/postgres_metadata_provider_test.rs @@ -118,10 +118,10 @@ async fn init_schema(pool: &PgPool) -> anyhow::Result<()> { sqlx::query( "CREATE TABLE IF NOT EXISTS ducklake_metadata ( - key VARCHAR NOT NULL, + key VARCHAR NOT NULL PRIMARY KEY, value VARCHAR NOT NULL, - scope VARCHAR NOT NULL DEFAULT '', - PRIMARY KEY (key, scope) + scope VARCHAR, + scope_id BIGINT )", ) .execute(pool) @@ -199,12 +199,13 @@ async fn populate_test_data(provider: &PostgresMetadataProvider) -> anyhow::Resu .await?; // Insert metadata (data_path) - sqlx::query("INSERT INTO ducklake_metadata (key, value, scope) VALUES ($1, $2, $3)") - .bind("data_path") - .bind("file:///tmp/ducklake_data/") - .bind("") - .execute(pool) - .await?; + sqlx::query( + "INSERT INTO ducklake_metadata (key, value, scope, scope_id) VALUES ($1, $2, NULL, NULL)", + ) + .bind("data_path") + .bind("file:///tmp/ducklake_data/") + .execute(pool) + .await?; // Insert schema sqlx::query( @@ -398,12 +399,13 @@ async fn populate_from_duckdb_catalog( } // Insert data_path metadata - sqlx::query("INSERT INTO ducklake_metadata (key, value, scope) VALUES ($1, $2, $3)") - .bind("data_path") - .bind(&data_path) - .bind("") - .execute(&mut *tx) - .await?; + sqlx::query( + "INSERT INTO ducklake_metadata (key, value, scope, scope_id) VALUES ($1, $2, NULL, NULL)", + ) + .bind("data_path") + .bind(&data_path) + .execute(&mut *tx) + .await?; // Insert schemas, tables, columns, and files for schema in &schemas {