Skip to content

Commit 5dc5764

Browse files
authored
feat(sqllogictest): use serde derived structs for schedule parsing (#1953)
This PR refactors the schedule file parsing in the sqllogictest crate to use serde-derived structs instead of manual TOML parsing, as requested in #1952. ### Changes - **New structs with serde derives:** - `ScheduleConfig` - top-level configuration parsed from TOML - `EngineConfig` - per-engine configuration with `#[serde(flatten)]` for extensibility - `EngineType` - enum of supported engine types - **Refactored parsing flow:** - `Schedule::from_file()` now uses `toml::from_str()` directly - Added `instantiate_engines()` to separate parsing from engine creation - Removed manual `parse_engines()` and `parse_steps()` functions - **Forward-compatibility:** - Uses `#[serde(flatten)]` to capture extra fields in `EngineConfig.extra` - This enables PR #1943 to easily add `catalog_type` and `catalog_properties` support ### Relation to #1943 This PR was suggested by @liurenjie1024 as a prerequisite to #1943 (dynamic catalog configuration). The `#[serde(flatten)]` approach allows #1943 to simply extract the catalog configuration from `EngineConfig.extra` without modifying the parsing logic. ### Testing - All existing tests pass - Added new unit tests for deserialization behavior - Integration test with `df_test.toml` passes unchanged Closes #1952
1 parent bc86d10 commit 5dc5764

File tree

3 files changed

+207
-105
lines changed

3 files changed

+207
-105
lines changed

crates/sqllogictest/src/engine/datafusion.rs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,8 @@ use iceberg::spec::{NestedField, PrimitiveType, Schema, Transform, Type, Unbound
2727
use iceberg::{Catalog, CatalogBuilder, NamespaceIdent, TableCreation};
2828
use iceberg_datafusion::IcebergCatalogProvider;
2929
use indicatif::ProgressBar;
30-
use toml::Table as TomlTable;
3130

32-
use crate::engine::{EngineRunner, run_slt_with_runner};
31+
use crate::engine::{DatafusionCatalogConfig, EngineRunner, run_slt_with_runner};
3332
use crate::error::Result;
3433

3534
pub struct DataFusionEngine {
@@ -59,22 +58,27 @@ impl EngineRunner for DataFusionEngine {
5958
}
6059

6160
impl DataFusionEngine {
62-
pub async fn new(config: TomlTable) -> Result<Self> {
61+
pub async fn new(catalog_config: Option<DatafusionCatalogConfig>) -> Result<Self> {
6362
let session_config = SessionConfig::new()
6463
.with_target_partitions(4)
6564
.with_information_schema(true);
6665
let ctx = SessionContext::new_with_config(session_config);
67-
ctx.register_catalog("default", Self::create_catalog(&config).await?);
66+
ctx.register_catalog(
67+
"default",
68+
Self::create_catalog(catalog_config.as_ref()).await?,
69+
);
6870

6971
Ok(Self {
7072
test_data_path: PathBuf::from("testdata"),
7173
session_context: ctx,
7274
})
7375
}
7476

75-
async fn create_catalog(_: &TomlTable) -> anyhow::Result<Arc<dyn CatalogProvider>> {
76-
// TODO: support dynamic catalog configuration
77-
// See: https://github.com/apache/iceberg-rust/issues/1780
77+
async fn create_catalog(
78+
_catalog_config: Option<&DatafusionCatalogConfig>,
79+
) -> anyhow::Result<Arc<dyn CatalogProvider>> {
80+
// TODO: Use catalog_config to load different catalog types via iceberg-catalog-loader
81+
// See: https://github.com/apache/iceberg-rust/issues/1780
7882
let catalog = MemoryCatalogBuilder::default()
7983
.load(
8084
"memory",

crates/sqllogictest/src/engine/mod.rs

Lines changed: 73 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -17,29 +17,45 @@
1717

1818
mod datafusion;
1919

20+
use std::collections::HashMap;
2021
use std::path::Path;
2122

2223
use anyhow::anyhow;
24+
use serde::Deserialize;
2325
use sqllogictest::{AsyncDB, MakeConnection, Runner, parse_file};
24-
use toml::Table as TomlTable;
2526

2627
use crate::engine::datafusion::DataFusionEngine;
2728
use crate::error::{Error, Result};
2829

29-
const TYPE_DATAFUSION: &str = "datafusion";
30+
/// Configuration for the catalog used by the DataFusion engine
31+
#[derive(Debug, Clone, Deserialize)]
32+
pub struct DatafusionCatalogConfig {
33+
/// Catalog type: "memory", "rest", "glue", "hms", "s3tables", "sql"
34+
#[serde(rename = "type")]
35+
pub catalog_type: String,
36+
/// Catalog properties passed to the catalog loader
37+
#[serde(default)]
38+
pub props: HashMap<String, String>,
39+
}
40+
41+
/// Engine configuration as a tagged enum
42+
#[derive(Debug, Clone, Deserialize)]
43+
#[serde(tag = "type", rename_all = "lowercase")]
44+
pub enum EngineConfig {
45+
Datafusion {
46+
#[serde(default)]
47+
catalog: Option<DatafusionCatalogConfig>,
48+
},
49+
}
3050

3151
#[async_trait::async_trait]
3252
pub trait EngineRunner: Send {
3353
async fn run_slt_file(&mut self, path: &Path) -> Result<()>;
3454
}
3555

36-
pub async fn load_engine_runner(
37-
engine_type: &str,
38-
cfg: TomlTable,
39-
) -> Result<Box<dyn EngineRunner>> {
40-
match engine_type {
41-
TYPE_DATAFUSION => Ok(Box::new(DataFusionEngine::new(cfg).await?)),
42-
_ => Err(anyhow::anyhow!("Unsupported engine type: {engine_type}").into()),
56+
pub async fn load_engine_runner(config: EngineConfig) -> Result<Box<dyn EngineRunner>> {
57+
match config {
58+
EngineConfig::Datafusion { catalog } => Ok(Box::new(DataFusionEngine::new(catalog).await?)),
4359
}
4460
}
4561

@@ -65,29 +81,63 @@ where
6581

6682
#[cfg(test)]
6783
mod tests {
68-
use crate::engine::{TYPE_DATAFUSION, load_engine_runner};
84+
use crate::engine::{DatafusionCatalogConfig, EngineConfig, load_engine_runner};
6985

70-
#[tokio::test]
71-
async fn test_engine_invalid_type() {
86+
#[test]
87+
fn test_deserialize_engine_config() {
88+
let input = r#"type = "datafusion""#;
89+
90+
let config: EngineConfig = toml::from_str(input).unwrap();
91+
assert!(matches!(config, EngineConfig::Datafusion { catalog: None }));
92+
}
93+
94+
#[test]
95+
fn test_deserialize_engine_config_with_catalog() {
96+
let input = r#"
97+
type = "datafusion"
98+
99+
[catalog]
100+
type = "rest"
101+
102+
[catalog.props]
103+
uri = "http://localhost:8181"
104+
"#;
105+
106+
let config: EngineConfig = toml::from_str(input).unwrap();
107+
match config {
108+
EngineConfig::Datafusion { catalog: Some(cat) } => {
109+
assert_eq!(cat.catalog_type, "rest");
110+
assert_eq!(
111+
cat.props.get("uri"),
112+
Some(&"http://localhost:8181".to_string())
113+
);
114+
}
115+
_ => panic!("Expected Datafusion with catalog"),
116+
}
117+
}
118+
119+
#[test]
120+
fn test_deserialize_catalog_config() {
72121
let input = r#"
73-
[engines]
74-
random = { type = "random_engine", url = "http://localhost:8181" }
122+
type = "memory"
123+
124+
[props]
125+
warehouse = "file:///tmp/warehouse"
75126
"#;
76-
let tbl = toml::from_str(input).unwrap();
77-
let result = load_engine_runner("random_engine", tbl).await;
78127

79-
assert!(result.is_err());
128+
let config: DatafusionCatalogConfig = toml::from_str(input).unwrap();
129+
assert_eq!(config.catalog_type, "memory");
130+
assert_eq!(
131+
config.props.get("warehouse"),
132+
Some(&"file:///tmp/warehouse".to_string())
133+
);
80134
}
81135

82136
#[tokio::test]
83137
async fn test_load_datafusion() {
84-
let input = r#"
85-
[engines]
86-
df = { type = "datafusion" }
87-
"#;
88-
let tbl = toml::from_str(input).unwrap();
89-
let result = load_engine_runner(TYPE_DATAFUSION, tbl).await;
138+
let config = EngineConfig::Datafusion { catalog: None };
90139

140+
let result = load_engine_runner(config).await;
91141
assert!(result.is_ok());
92142
}
93143
}

0 commit comments

Comments
 (0)