Skip to content

Commit 29cfc80

Browse files
authored
Merge branch 'trunk' into peasee/260218-move-tableformat
2 parents e671049 + 53f58f3 commit 29cfc80

5 files changed

Lines changed: 14 additions & 23 deletions

File tree

crates/data-generation/src/dataset/mod.rs

Lines changed: 6 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -34,40 +34,26 @@ pub struct DatasetTable {
3434
pub name: String,
3535
/// The Arrow schema for the table (without the time column).
3636
pub schema: SchemaRef,
37-
/// The time column for the table, if any.
37+
/// The time column for the table.
3838
///
39-
/// When set, this column is *not* included in [`schema`] — it is appended
39+
/// This column is *not* included in [`schema`] — it is appended
4040
/// during rehydration via [`DatasetTable::rehydrate`].
41-
pub time_column: Option<String>,
41+
pub time_column: String,
4242
}
4343

4444
impl DatasetTable {
45-
/// Returns the full schema including the time column, if one is configured.
46-
///
47-
/// If `time_column` is `None`, this returns the same schema as [`schema`].
45+
/// Returns the full schema including the time column.
4846
pub fn rehydrated_schema(&self) -> SchemaRef {
49-
let Some(ref time_col) = self.time_column else {
50-
return Arc::clone(&self.schema);
51-
};
52-
5347
let ts_type = DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into()));
5448
let mut fields: Vec<_> = self.schema.fields().iter().cloned().collect();
55-
fields.push(Arc::new(Field::new(time_col, ts_type, true)));
49+
fields.push(Arc::new(Field::new(&self.time_column, ts_type, true)));
5650
Arc::new(arrow::datatypes::Schema::new(fields))
5751
}
5852

5953
/// Rehydrate a batch by appending the time column with the current timestamp.
6054
///
61-
/// If this table has no `time_column`, the batch is returned unchanged.
6255
/// The batch schema must match [`schema`] (i.e. without the time column).
6356
pub fn rehydrate(&self, batch: &RecordBatch) -> anyhow::Result<RecordBatch> {
64-
if self.time_column.is_none() {
65-
anyhow::bail!(
66-
"Cannot rehydrate table '{}' without a time column",
67-
self.name
68-
);
69-
}
70-
7157
if batch.schema() != self.schema {
7258
let mut diffs = Vec::new();
7359
let expected_fields = self.schema.fields();
@@ -284,8 +270,7 @@ pub trait Dataset: Send + Sync {
284270
/// Rehydrate a batch for the given table by appending the time column.
285271
///
286272
/// Uses the table metadata from [`tables()`] to look up the time column name
287-
/// and delegates to [`DatasetTable::rehydrate`]. If the table has no time column,
288-
/// a rehydration error is returned.
273+
/// and delegates to [`DatasetTable::rehydrate`].
289274
fn rehydrate(&self, table: &str, batch: &RecordBatch) -> anyhow::Result<RecordBatch> {
290275
let tables = self.tables();
291276
let dataset_table = tables

crates/data-generation/src/dataset/simple_sequence.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ impl Dataset for SimpleSequenceDataset {
102102
DatasetTable {
103103
name: "integer_sequence".to_string(),
104104
schema: Self::schema(),
105-
time_column: Some("inserted_at".to_string()),
105+
time_column: "inserted_at".to_string(),
106106
},
107107
)])
108108
}

crates/data-generation/src/dataset/tpch.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -323,7 +323,7 @@ impl Dataset for TpchDataset {
323323
DatasetTable {
324324
name: (*name).to_string(),
325325
schema: tpch_schema(name),
326-
time_column: Some((*time_col).to_string()),
326+
time_column: (*time_col).to_string(),
327327
},
328328
)
329329
})

crates/etl/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,8 @@ impl ETLPipeline {
189189
etl_type: EtlType::S3,
190190
schema: table.rehydrated_schema(),
191191
params: self.target.table_params(&name),
192+
time_column: table.time_column.clone(),
193+
partitions: vec![], // TODO: support dynamically specifying partitioning schemes
192194
};
193195
(name, config)
194196
})

crates/system-adapter-protocol/src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,10 @@ pub struct DatasetConfig {
152152
pub schema: SchemaRef,
153153
/// ETL-specific configuration parameters
154154
pub params: HashMap<String, serde_json::Value>,
155+
/// The time column to use for append/change-stream capture for the dataset
156+
pub time_column: String,
157+
/// The table paritioning scheme to use for this dataset
158+
pub partitions: Vec<String>,
155159
}
156160

157161
/// Request to setup a benchmark run with ETL configuration

0 commit comments

Comments
 (0)