Skip to content

Commit 37e366e

Browse files
fix: Remove internal columns from ETL presented schema (#103)
* fix: Remove internal columns from ETL presented schema * fix * chore: auto-fix cargo fmt + clippy --------- Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent 8ccd075 commit 37e366e

2 files changed

Lines changed: 264 additions & 43 deletions

File tree

crates/etl/src/lib.rs

Lines changed: 261 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -444,10 +444,20 @@ impl ETLPipeline {
444444
.tables()
445445
.into_iter()
446446
.map(|(name, table)| {
447+
// Strip internal columns (_op, _op_index) from the schema, as
448+
// these are removed before data is written to the sink.
449+
let fields: Vec<_> = table
450+
.schema
451+
.fields()
452+
.iter()
453+
.filter(|f| !INTERNAL_COLUMNS.contains(&f.name().as_str()))
454+
.cloned()
455+
.collect();
456+
let schema: SchemaRef = Arc::new(Schema::new(fields));
447457
let schema = if with_created_at {
448-
schema_with_created_at(&table.schema)
458+
schema_with_created_at(&schema)
449459
} else {
450-
table.schema.clone()
460+
schema
451461
};
452462
let config = ProtocolDatasetConfig { schema };
453463
(name, config)
@@ -966,3 +976,252 @@ async fn run_pipeline(
966976
);
967977
PipelineState::Stopped(StopReason::Completed)
968978
}
979+
980+
#[cfg(test)]
981+
mod tests {
982+
use super::*;
983+
use arrow::array::RecordBatch;
984+
use async_trait::async_trait;
985+
use data_generation::config::DatasetConfig as GenerationDatasetConfig;
986+
use data_generation::dataset::MutationConfig;
987+
use data_generation::storage::{DataStorage, ReadResult, WriteResult};
988+
use data_generation::version::VersionMetadata;
989+
use std::collections::{HashMap, VecDeque};
990+
991+
/// A no-op [`DataStorage`] implementation for testing.
992+
///
993+
/// The TPCH dataset only needs storage during batch generation/reading,
994+
/// which is not exercised by `create_tables_request_datasets`.
995+
struct MockStorage;
996+
997+
#[async_trait]
998+
impl DataStorage for MockStorage {
999+
async fn list_batches(&self, _table_name: &str) -> anyhow::Result<Vec<String>> {
1000+
Ok(Vec::new())
1001+
}
1002+
1003+
async fn read_batch(
1004+
&self,
1005+
_table_name: &str,
1006+
_batch_id: u64,
1007+
) -> anyhow::Result<Option<ReadResult>> {
1008+
Ok(None)
1009+
}
1010+
1011+
async fn write(
1012+
&self,
1013+
_table_name: &str,
1014+
_batch_id: u64,
1015+
_batch: RecordBatch,
1016+
) -> anyhow::Result<WriteResult> {
1017+
Ok(WriteResult {
1018+
rows_written: 0,
1019+
bytes_written: 0,
1020+
})
1021+
}
1022+
1023+
async fn write_version_metadata(&self, _metadata: &VersionMetadata) -> anyhow::Result<()> {
1024+
Ok(())
1025+
}
1026+
1027+
async fn read_version_metadata(&self) -> anyhow::Result<Option<VersionMetadata>> {
1028+
Ok(None)
1029+
}
1030+
1031+
async fn read_batch_ids(&self, _table_name: &str) -> anyhow::Result<VecDeque<u64>> {
1032+
Ok(VecDeque::new())
1033+
}
1034+
1035+
fn table_params(&self, _table_name: &str) -> HashMap<String, serde_json::Value> {
1036+
HashMap::new()
1037+
}
1038+
1039+
fn expected_files(&self, _table_name: &str, _batch_ids: &[u64]) -> Vec<String> {
1040+
Vec::new()
1041+
}
1042+
}
1043+
1044+
/// A no-op [`Sink`] implementation for testing.
1045+
struct MockSink;
1046+
1047+
#[async_trait]
1048+
impl Sink for MockSink {
1049+
async fn write(
1050+
&self,
1051+
_table_name: &str,
1052+
_batch_id: u64,
1053+
_batch: RecordBatch,
1054+
_op: InsertOp,
1055+
) -> anyhow::Result<()> {
1056+
Ok(())
1057+
}
1058+
}
1059+
1060+
/// Helper to build a pipeline with the TPCH dataset for testing.
1061+
fn make_tpch_pipeline(with_created_at: bool) -> ETLPipeline {
1062+
let config = GenerationDatasetConfig {
1063+
dataset_type: "tpch".to_string(),
1064+
scale_factor: 0.01,
1065+
num_steps: 1,
1066+
};
1067+
let mutations = MutationConfig::new(0.0, 0.0);
1068+
let storage: Arc<dyn DataStorage> = Arc::new(MockStorage);
1069+
let sink: Arc<dyn Sink> = Arc::new(MockSink);
1070+
1071+
let pipeline = ETLPipeline::new(DatasetSource::Tpch, &config, storage, sink, &mutations)
1072+
.expect("failed to create pipeline");
1073+
1074+
if with_created_at {
1075+
pipeline.with_created_at(true)
1076+
} else {
1077+
pipeline
1078+
}
1079+
}
1080+
1081+
#[test]
1082+
fn create_tables_request_datasets_strips_internal_columns() {
1083+
let pipeline = make_tpch_pipeline(false);
1084+
let datasets = pipeline.create_tables_request_datasets();
1085+
1086+
// The TPCH dataset should expose all 8 tables.
1087+
assert!(!datasets.is_empty(), "Expected non-empty dataset map");
1088+
1089+
for (table_name, dataset_config) in &datasets {
1090+
let field_names: Vec<&str> = dataset_config
1091+
.schema
1092+
.fields()
1093+
.iter()
1094+
.map(|f| f.name().as_str())
1095+
.collect();
1096+
1097+
assert!(
1098+
!field_names.contains(&"_op"),
1099+
"Table '{table_name}' schema should not contain '_op' column, but found fields: {field_names:?}"
1100+
);
1101+
assert!(
1102+
!field_names.contains(&"_op_index"),
1103+
"Table '{table_name}' schema should not contain '_op_index' column, but found fields: {field_names:?}"
1104+
);
1105+
}
1106+
}
1107+
1108+
#[test]
1109+
fn create_tables_request_datasets_does_not_include_created_at_by_default() {
1110+
let pipeline = make_tpch_pipeline(false);
1111+
let datasets = pipeline.create_tables_request_datasets();
1112+
1113+
for (table_name, dataset_config) in &datasets {
1114+
let field_names: Vec<&str> = dataset_config
1115+
.schema
1116+
.fields()
1117+
.iter()
1118+
.map(|f| f.name().as_str())
1119+
.collect();
1120+
1121+
assert!(
1122+
!field_names.contains(&CREATED_AT_COLUMN),
1123+
"Table '{table_name}' should not have '{CREATED_AT_COLUMN}' when with_created_at is false, but found fields: {field_names:?}"
1124+
);
1125+
}
1126+
}
1127+
1128+
#[test]
1129+
fn create_tables_request_datasets_includes_created_at_when_enabled() {
1130+
let pipeline = make_tpch_pipeline(true);
1131+
let datasets = pipeline.create_tables_request_datasets();
1132+
1133+
for (table_name, dataset_config) in &datasets {
1134+
let field_names: Vec<&str> = dataset_config
1135+
.schema
1136+
.fields()
1137+
.iter()
1138+
.map(|f| f.name().as_str())
1139+
.collect();
1140+
1141+
assert!(
1142+
field_names.contains(&CREATED_AT_COLUMN),
1143+
"Table '{table_name}' should have '{CREATED_AT_COLUMN}' when with_created_at is true, but found fields: {field_names:?}"
1144+
);
1145+
1146+
// Internal columns should still be stripped.
1147+
assert!(
1148+
!field_names.contains(&"_op"),
1149+
"Table '{table_name}' schema should not contain '_op' even with created_at enabled, but found fields: {field_names:?}"
1150+
);
1151+
assert!(
1152+
!field_names.contains(&"_op_index"),
1153+
"Table '{table_name}' schema should not contain '_op_index' even with created_at enabled, but found fields: {field_names:?}"
1154+
);
1155+
}
1156+
}
1157+
1158+
#[test]
1159+
fn create_tables_request_datasets_preserves_data_columns() {
1160+
let pipeline = make_tpch_pipeline(false);
1161+
let datasets = pipeline.create_tables_request_datasets();
1162+
1163+
// Verify the raw dataset's tables DO have internal columns (sanity
1164+
// check that the test is meaningful).
1165+
let raw_tables = pipeline.dataset().tables();
1166+
for (table_name, raw_table) in &raw_tables {
1167+
let raw_field_names: Vec<&str> = raw_table
1168+
.schema
1169+
.fields()
1170+
.iter()
1171+
.map(|f| f.name().as_str())
1172+
.collect();
1173+
assert!(
1174+
raw_field_names.contains(&"_op"),
1175+
"Sanity check: raw TPCH table '{table_name}' should contain '_op'"
1176+
);
1177+
assert!(
1178+
raw_field_names.contains(&"_op_index"),
1179+
"Sanity check: raw TPCH table '{table_name}' should contain '_op_index'"
1180+
);
1181+
}
1182+
1183+
// Now verify the returned datasets have all the non-internal columns.
1184+
for (table_name, raw_table) in &raw_tables {
1185+
let dataset_config = datasets
1186+
.get(table_name)
1187+
.unwrap_or_else(|| panic!("Table '{table_name}' missing from datasets"));
1188+
1189+
let expected_fields: Vec<&str> = raw_table
1190+
.schema
1191+
.fields()
1192+
.iter()
1193+
.map(|f| f.name().as_str())
1194+
.filter(|name| !INTERNAL_COLUMNS.contains(name))
1195+
.collect();
1196+
let actual_fields: Vec<&str> = dataset_config
1197+
.schema
1198+
.fields()
1199+
.iter()
1200+
.map(|f| f.name().as_str())
1201+
.collect();
1202+
1203+
assert_eq!(
1204+
expected_fields, actual_fields,
1205+
"Table '{table_name}' should retain all non-internal fields"
1206+
);
1207+
}
1208+
}
1209+
1210+
#[test]
1211+
fn create_tables_request_datasets_returns_all_tpch_tables() {
1212+
let pipeline = make_tpch_pipeline(false);
1213+
let datasets = pipeline.create_tables_request_datasets();
1214+
1215+
let expected_tables = [
1216+
"region", "nation", "supplier", "customer", "part", "partsupp", "orders", "lineitem",
1217+
];
1218+
1219+
for table in expected_tables {
1220+
assert!(
1221+
datasets.contains_key(table),
1222+
"Expected table '{table}' in create_tables_request_datasets output"
1223+
);
1224+
}
1225+
assert_eq!(datasets.len(), expected_tables.len());
1226+
}
1227+
}

src/main.rs

Lines changed: 3 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,12 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17-
use std::{collections::HashMap, sync::Arc};
17+
use std::sync::Arc;
1818

1919
use adbc_client::AdbcConnection;
20-
use arrow_schema::{DataType, Field, Schema, TimeUnit};
2120
use checkpointer::CheckpointStore;
2221
use clap::Parser;
2322
use data_generation::config::{TargetConfig, build_version_prefix};
24-
use data_generation::dataset::Dataset;
2523
use data_generation::storage::DataStorage;
2624
use data_generation::storage::s3::S3Storage;
2725
use data_generation::version::VersionMetadata;
@@ -39,31 +37,6 @@ mod scenario;
3937
use crate::args::CommonArgs;
4038
use crate::commands::connect_system_adapter;
4139

42-
fn create_tables_request_datasets(
43-
dataset: &Arc<dyn Dataset>,
44-
with_created_at: bool,
45-
) -> HashMap<String, system_adapter_protocol::DatasetConfig> {
46-
dataset
47-
.tables()
48-
.into_iter()
49-
.map(|(name, table)| {
50-
let schema = if with_created_at {
51-
let mut fields: Vec<_> = table.schema.fields().iter().cloned().collect();
52-
fields.push(Arc::new(Field::new(
53-
"__created_at",
54-
DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
55-
true,
56-
)));
57-
Arc::new(Schema::new(fields))
58-
} else {
59-
table.schema.clone()
60-
};
61-
62-
(name, system_adapter_protocol::DatasetConfig { schema })
63-
})
64-
.collect()
65-
}
66-
6740
#[derive(Parser)]
6841
#[command(author, version, about, long_about = None)]
6942
struct Cli {
@@ -84,7 +57,6 @@ async fn run_benchmark(
8457
adbc_driver: system_adapter_protocol::SetupResponse,
8558
version_metadata: &VersionMetadata,
8659
source: Arc<S3Storage>,
87-
datasets: HashMap<String, system_adapter_protocol::DatasetConfig>,
8860
) -> anyhow::Result<()> {
8961
// --- Download checkpoints from S3 ---
9062
let scenario_name = common.scenario.to_string();
@@ -161,6 +133,8 @@ async fn run_benchmark(
161133
)?
162134
.with_created_at(common.with_created_at);
163135

136+
let datasets = pipeline.create_tables_request_datasets();
137+
164138
if let Err(e) = system_adapter_client.create_tables(run_id, datasets).await {
165139
pipeline.cancel();
166140
return Err(anyhow::anyhow!(
@@ -252,10 +226,6 @@ async fn main() -> anyhow::Result<()> {
252226
)
253227
})?;
254228

255-
let dataset_source = DatasetSource::from_dataset_type(&version_metadata.dataset_type)?;
256-
let generation_config = version_metadata.dataset_config();
257-
let mutations = version_metadata.mutation_config();
258-
259229
// --- Connect to the system adapter ---
260230
let mut system_adapter_client = match connect_system_adapter(&cli.common).await {
261231
Ok(system_adapter_client) => system_adapter_client,
@@ -266,13 +236,6 @@ async fn main() -> anyhow::Result<()> {
266236

267237
let run_id = uuid::Uuid::new_v4();
268238

269-
let setup_dataset = dataset_source.create(
270-
&generation_config,
271-
&mutations,
272-
Arc::clone(&source) as Arc<dyn DataStorage>,
273-
)?;
274-
let datasets = create_tables_request_datasets(&setup_dataset, cli.common.with_created_at);
275-
276239
let setup_metadata = std::collections::HashMap::from([
277240
(
278241
"executor_instance_type".to_string(),
@@ -298,7 +261,6 @@ async fn main() -> anyhow::Result<()> {
298261
adbc_driver,
299262
&version_metadata,
300263
source,
301-
datasets,
302264
)
303265
.await;
304266

0 commit comments

Comments
 (0)