Skip to content

Commit 4dedc3f

Browse files
committed
feat: Include time column and table partitions in SAP DatasetConfig
1 parent f57d3b6 commit 4dedc3f

6 files changed

Lines changed: 18 additions & 28 deletions

File tree

crates/data-generation/src/dataset/mod.rs

Lines changed: 6 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -34,39 +34,26 @@ pub struct DatasetTable {
3434
pub name: String,
3535
/// The Arrow schema for the table (without the time column).
3636
pub schema: SchemaRef,
37-
/// The time column for the table, if any.
37+
/// The time column for the table.
3838
///
39-
/// When set, this column is *not* included in [`schema`] — it is appended
39+
/// This column is *not* included in [`schema`] — it is appended
4040
/// during rehydration via [`DatasetTable::rehydrate`].
41-
pub time_column: Option<String>,
41+
pub time_column: String,
4242
}
4343

4444
impl DatasetTable {
45-
/// Returns the full schema including the time column, if one is configured.
46-
///
47-
/// If `time_column` is `None`, this returns the same schema as [`schema`].
45+
/// Returns the full schema including the time column.
4846
pub fn rehydrated_schema(&self) -> SchemaRef {
49-
let Some(ref time_col) = self.time_column else {
50-
return Arc::clone(&self.schema);
51-
};
52-
5347
let ts_type = DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into()));
5448
let mut fields: Vec<_> = self.schema.fields().iter().cloned().collect();
55-
fields.push(Arc::new(Field::new(time_col, ts_type, true)));
49+
fields.push(Arc::new(Field::new(&self.time_column, ts_type, true)));
5650
Arc::new(arrow::datatypes::Schema::new(fields))
5751
}
5852

5953
/// Rehydrate a batch by appending the time column with the current timestamp.
6054
///
61-
/// If this table has no `time_column`, the batch is returned unchanged.
6255
/// The batch schema must match [`schema`] (i.e. without the time column).
6356
pub fn rehydrate(&self, batch: &RecordBatch) -> anyhow::Result<RecordBatch> {
64-
if self.time_column.is_none() {
65-
anyhow::bail!(
66-
"Cannot rehydrate table '{}' without a time column",
67-
self.name
68-
);
69-
}
7057

7158
if batch.schema() != self.schema {
7259
let mut diffs = Vec::new();
@@ -284,8 +271,7 @@ pub trait Dataset: Send + Sync {
284271
/// Rehydrate a batch for the given table by appending the time column.
285272
///
286273
/// Uses the table metadata from [`tables()`] to look up the time column name
287-
/// and delegates to [`DatasetTable::rehydrate`]. If the table has no time column,
288-
/// a rehydration error is returned.
274+
/// and delegates to [`DatasetTable::rehydrate`].
289275
fn rehydrate(&self, table: &str, batch: &RecordBatch) -> anyhow::Result<RecordBatch> {
290276
let tables = self.tables();
291277
let dataset_table = tables

crates/data-generation/src/dataset/simple_sequence.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ impl Dataset for SimpleSequenceDataset {
102102
DatasetTable {
103103
name: "integer_sequence".to_string(),
104104
schema: Self::schema(),
105-
time_column: Some("inserted_at".to_string()),
105+
time_column: "inserted_at".to_string(),
106106
},
107107
)])
108108
}

crates/data-generation/src/dataset/tpch.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -323,7 +323,7 @@ impl Dataset for TpchDataset {
323323
DatasetTable {
324324
name: (*name).to_string(),
325325
schema: tpch_schema(name),
326-
time_column: Some((*time_col).to_string()),
326+
time_column: (*time_col).to_string(),
327327
},
328328
)
329329
})

crates/data-generation/src/ingestor.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -73,12 +73,10 @@ impl Ingestor {
7373
"location".to_string(),
7474
serde_json::Value::String(loc_fn(&name)),
7575
);
76-
if let Some(ref time_col) = table.time_column {
77-
entry.insert(
78-
"time_column".to_string(),
79-
serde_json::Value::String(time_col.clone()),
80-
);
81-
}
76+
entry.insert(
77+
"time_column".to_string(),
78+
serde_json::Value::String(table.time_column.clone()),
79+
);
8280
map.insert(name, serde_json::Value::Object(entry));
8381
}
8482
println!("{}", serde_json::Value::Object(map));

crates/etl/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,8 @@ impl ETLPipeline {
189189
etl_type: EtlType::S3,
190190
schema: table.rehydrated_schema(),
191191
params: self.target.table_params(&name),
192+
time_column: table.time_column.clone(),
193+
partitions: vec![], // TODO: support dynamically specifying partitioning schemes
192194
};
193195
(name, config)
194196
})

crates/system-adapter-protocol/src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,10 @@ pub struct DatasetConfig {
150150
pub schema: SchemaRef,
151151
/// ETL-specific configuration parameters
152152
pub params: HashMap<String, serde_json::Value>,
153+
/// The time column to use for append/change-stream capture for the dataset
154+
pub time_column: String,
155+
/// The table paritioning scheme to use for this dataset
156+
pub partitions: Vec<String>,
153157
}
154158

155159
/// Request to setup a benchmark run with ETL configuration

0 commit comments

Comments
 (0)