Skip to content

Commit fb31c22

Browse files
authored
feat: Implement basic ETL pipeline for creates with rehydrations (#36)
* feat: Add ETLPipeline struct * feat: Update ETL pipeline for E2E * fix: Add params output for setup_request_datasets
1 parent c073c4e commit fb31c22

16 files changed

Lines changed: 767 additions & 91 deletions

File tree

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/data-generation/README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# Data Generator
2+
3+
```bash
4+
cargo run -p data-generation -- run --scale-factor 1 --bucket peasee-indexes --region us-west-2 --prefix raw --num-steps 10
5+
```

crates/data-generation/src/config.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ pub struct CommonArgs {
5252
pub scale_factor: f64,
5353

5454
/// Number of data generation steps (partitions for TPC-H dbgen)
55-
#[arg(long, default_value_t = 100)]
55+
#[arg(long, default_value_t = 25)]
5656
pub num_steps: u16,
5757

5858
/// S3 bucket name
@@ -72,7 +72,7 @@ pub struct CommonArgs {
7272
pub endpoint: Option<String>,
7373

7474
/// Maximum number of concurrent S3 writes
75-
#[arg(long, default_value_t = 8)]
75+
#[arg(long, default_value_t = 16)]
7676
pub max_concurrency: usize,
7777
}
7878

crates/data-generation/src/dataset/mod.rs

Lines changed: 86 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,16 @@ limitations under the License.
1717
pub mod tpch;
1818
pub mod simple_sequence;
1919

20-
use std::collections::HashMap;
20+
use std::collections::{HashMap, VecDeque};
2121
use std::sync::Arc;
2222
use std::time::{SystemTime, UNIX_EPOCH};
2323

2424
use arrow::array::{RecordBatch, TimestampMicrosecondArray};
2525
use arrow::datatypes::{DataType, Field, SchemaRef, TimeUnit};
2626
use async_trait::async_trait;
2727

28+
use crate::config::DatasetConfig;
29+
2830
/// Metadata about a table in a dataset.
2931
#[derive(Debug, Clone)]
3032
pub struct DatasetTable {
@@ -64,11 +66,41 @@ impl DatasetTable {
6466
}
6567

6668
if batch.schema() != self.schema {
69+
let mut diffs = Vec::new();
70+
let expected_fields = self.schema.fields();
71+
let actual_schema = batch.schema();
72+
let actual_fields = actual_schema.fields();
73+
74+
for (i, expected) in expected_fields.iter().enumerate() {
75+
match actual_fields.get(i) {
76+
Some(actual) if actual != expected => {
77+
if actual.name() != expected.name() {
78+
diffs.push(format!(" column {i}: expected name '{}', got '{}'", expected.name(), actual.name()));
79+
}
80+
if actual.data_type() != expected.data_type() {
81+
diffs.push(format!(" column '{}' (index {i}): expected type {:?}, got {:?}", expected.name(), expected.data_type(), actual.data_type()));
82+
}
83+
if actual.is_nullable() != expected.is_nullable() {
84+
diffs.push(format!(" column '{}' (index {i}): expected nullable={}, got nullable={}", expected.name(), expected.is_nullable(), actual.is_nullable()));
85+
}
86+
}
87+
None => {
88+
diffs.push(format!(" column '{}' (index {i}): missing from batch", expected.name()));
89+
}
90+
_ => {}
91+
}
92+
}
93+
for i in expected_fields.len()..actual_fields.len() {
94+
diffs.push(format!(" column '{}' (index {i}): unexpected extra column in batch", actual_fields[i].name()));
95+
}
96+
if expected_fields.len() != actual_fields.len() {
97+
diffs.push(format!(" expected {} columns, got {}", expected_fields.len(), actual_fields.len()));
98+
}
99+
67100
anyhow::bail!(
68-
"Schema mismatch for table '{}': expected {}, got {}",
101+
"Schema mismatch for table '{}':\n{}",
69102
self.name,
70-
self.schema,
71-
batch.schema()
103+
diffs.join("\n")
72104
);
73105
}
74106

@@ -91,12 +123,27 @@ impl DatasetTable {
91123

92124
#[async_trait]
93125
pub trait Dataset: Send + Sync {
126+
/// Creates a new instance of this dataset from the given configuration.
127+
///
128+
/// This is a factory method that returns an `Arc<dyn Dataset>` without any
129+
/// external side-effects beyond initialising in-memory state.
130+
///
131+
/// The default implementation returns an error; concrete dataset types
132+
/// should override this.
133+
fn create(config: &DatasetConfig) -> anyhow::Result<Arc<dyn Dataset>>
134+
where
135+
Self: Sized + 'static,
136+
{
137+
let _ = config;
138+
anyhow::bail!("create() is not implemented for this dataset type")
139+
}
140+
94141
/// Returns the batch IDs that would be produced for a given table after a
95142
/// successful generation run.
96143
///
97144
/// The default implementation returns `0..num_batches(table)`, but
98145
/// implementations may override this to customise the ID scheme.
99-
fn batch_ids(&self, table: &str) -> Vec<u64> {
146+
fn batch_ids(&self, table: &str) -> VecDeque<u64> {
100147
(0..self.num_batches(table)).collect()
101148
}
102149

@@ -127,9 +174,40 @@ pub trait Dataset: Send + Sync {
127174
};
128175

129176
if batch.schema() != expected_schema {
177+
let mut diffs = Vec::new();
178+
let expected_fields = expected_schema.fields();
179+
let actual_schema = batch.schema();
180+
let actual_fields = actual_schema.fields();
181+
182+
for (i, expected) in expected_fields.iter().enumerate() {
183+
match actual_fields.get(i) {
184+
Some(actual) if actual != expected => {
185+
if actual.name() != expected.name() {
186+
diffs.push(format!(" column {i}: expected name '{}', got '{}'", expected.name(), actual.name()));
187+
}
188+
if actual.data_type() != expected.data_type() {
189+
diffs.push(format!(" column '{}' (index {i}): expected type {:?}, got {:?}", expected.name(), expected.data_type(), actual.data_type()));
190+
}
191+
if actual.is_nullable() != expected.is_nullable() {
192+
diffs.push(format!(" column '{}' (index {i}): expected nullable={}, got nullable={}", expected.name(), expected.is_nullable(), actual.is_nullable()));
193+
}
194+
}
195+
None => {
196+
diffs.push(format!(" column '{}' (index {i}): missing from batch", expected.name()));
197+
}
198+
_ => {}
199+
}
200+
}
201+
for i in expected_fields.len()..actual_fields.len() {
202+
diffs.push(format!(" column '{}' (index {i}): unexpected extra column in batch", actual_fields[i].name()));
203+
}
204+
if expected_fields.len() != actual_fields.len() {
205+
diffs.push(format!(" expected {} columns, got {}", expected_fields.len(), actual_fields.len()));
206+
}
207+
130208
anyhow::bail!(
131-
"Schema mismatch for table '{table}': expected {expected_schema}, got {}",
132-
batch.schema()
209+
"Schema mismatch for table '{table}':\n{}",
210+
diffs.join("\n")
133211
);
134212
}
135213

@@ -171,7 +249,7 @@ pub trait Dataset: Send + Sync {
171249

172250
#[async_trait]
173251
impl Dataset for Arc<dyn Dataset> {
174-
fn batch_ids(&self, table: &str) -> Vec<u64> {
252+
fn batch_ids(&self, table: &str) -> VecDeque<u64> {
175253
(**self).batch_ids(table)
176254
}
177255

crates/data-generation/src/dataset/simple_sequence.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,13 @@ impl SimpleSequenceDataset {
6262

6363
#[async_trait]
6464
impl Dataset for SimpleSequenceDataset {
65+
fn create(config: &DatasetConfig) -> anyhow::Result<Arc<dyn Dataset>>
66+
where
67+
Self: Sized + 'static,
68+
{
69+
Ok(Arc::new(Self::new(config)))
70+
}
71+
6572
fn num_batches(&self, _table: &str) -> u64 {
6673
// One batch per step for the single table.
6774
u64::from(self.num_steps)

crates/data-generation/src/dataset/tpch.rs

Lines changed: 61 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -46,80 +46,80 @@ const TPCH_TABLE_TIME_COLUMNS: &[(&str, &str)] = &[
4646
fn tpch_schema(table: &str) -> SchemaRef {
4747
let fields: Vec<Field> = match table {
4848
"region" => vec![
49-
Field::new("r_regionkey", DataType::Int32, false),
50-
Field::new("r_name", DataType::Utf8, false),
49+
Field::new("r_regionkey", DataType::Int32, true),
50+
Field::new("r_name", DataType::Utf8, true),
5151
Field::new("r_comment", DataType::Utf8, true),
5252
],
5353
"nation" => vec![
54-
Field::new("n_nationkey", DataType::Int32, false),
55-
Field::new("n_name", DataType::Utf8, false),
56-
Field::new("n_regionkey", DataType::Int32, false),
54+
Field::new("n_nationkey", DataType::Int32, true),
55+
Field::new("n_name", DataType::Utf8, true),
56+
Field::new("n_regionkey", DataType::Int32, true),
5757
Field::new("n_comment", DataType::Utf8, true),
5858
],
5959
"supplier" => vec![
60-
Field::new("s_suppkey", DataType::Int32, false),
61-
Field::new("s_name", DataType::Utf8, false),
62-
Field::new("s_address", DataType::Utf8, false),
63-
Field::new("s_nationkey", DataType::Int32, false),
64-
Field::new("s_phone", DataType::Utf8, false),
65-
Field::new("s_acctbal", DataType::Decimal128(15, 2), false),
60+
Field::new("s_suppkey", DataType::Int64, true),
61+
Field::new("s_name", DataType::Utf8, true),
62+
Field::new("s_address", DataType::Utf8, true),
63+
Field::new("s_nationkey", DataType::Int32, true),
64+
Field::new("s_phone", DataType::Utf8, true),
65+
Field::new("s_acctbal", DataType::Decimal128(15, 2), true),
6666
Field::new("s_comment", DataType::Utf8, true),
6767
],
6868
"customer" => vec![
69-
Field::new("c_custkey", DataType::Int32, false),
70-
Field::new("c_name", DataType::Utf8, false),
71-
Field::new("c_address", DataType::Utf8, false),
72-
Field::new("c_nationkey", DataType::Int32, false),
73-
Field::new("c_phone", DataType::Utf8, false),
74-
Field::new("c_acctbal", DataType::Decimal128(15, 2), false),
75-
Field::new("c_mktsegment", DataType::Utf8, false),
69+
Field::new("c_custkey", DataType::Int64, true),
70+
Field::new("c_name", DataType::Utf8, true),
71+
Field::new("c_address", DataType::Utf8, true),
72+
Field::new("c_nationkey", DataType::Int32, true),
73+
Field::new("c_phone", DataType::Utf8, true),
74+
Field::new("c_acctbal", DataType::Decimal128(15, 2), true),
75+
Field::new("c_mktsegment", DataType::Utf8, true),
7676
Field::new("c_comment", DataType::Utf8, true),
7777
],
7878
"part" => vec![
79-
Field::new("p_partkey", DataType::Int32, false),
80-
Field::new("p_name", DataType::Utf8, false),
81-
Field::new("p_mfgr", DataType::Utf8, false),
82-
Field::new("p_brand", DataType::Utf8, false),
83-
Field::new("p_type", DataType::Utf8, false),
84-
Field::new("p_size", DataType::Int32, false),
85-
Field::new("p_container", DataType::Utf8, false),
86-
Field::new("p_retailprice", DataType::Decimal128(15, 2), false),
79+
Field::new("p_partkey", DataType::Int64, true),
80+
Field::new("p_name", DataType::Utf8, true),
81+
Field::new("p_mfgr", DataType::Utf8, true),
82+
Field::new("p_brand", DataType::Utf8, true),
83+
Field::new("p_type", DataType::Utf8, true),
84+
Field::new("p_size", DataType::Int32, true),
85+
Field::new("p_container", DataType::Utf8, true),
86+
Field::new("p_retailprice", DataType::Decimal128(15, 2), true),
8787
Field::new("p_comment", DataType::Utf8, true),
8888
],
8989
"partsupp" => vec![
90-
Field::new("ps_partkey", DataType::Int32, false),
91-
Field::new("ps_suppkey", DataType::Int32, false),
92-
Field::new("ps_availqty", DataType::Int32, false),
93-
Field::new("ps_supplycost", DataType::Decimal128(15, 2), false),
90+
Field::new("ps_partkey", DataType::Int64, true),
91+
Field::new("ps_suppkey", DataType::Int64, true),
92+
Field::new("ps_availqty", DataType::Int64, true),
93+
Field::new("ps_supplycost", DataType::Decimal128(15, 2), true),
9494
Field::new("ps_comment", DataType::Utf8, true),
9595
],
9696
"orders" => vec![
97-
Field::new("o_orderkey", DataType::Int32, false),
98-
Field::new("o_custkey", DataType::Int32, false),
99-
Field::new("o_orderstatus", DataType::Utf8, false),
100-
Field::new("o_totalprice", DataType::Decimal128(15, 2), false),
101-
Field::new("o_orderdate", DataType::Date32, false),
102-
Field::new("o_orderpriority", DataType::Utf8, false),
103-
Field::new("o_clerk", DataType::Utf8, false),
104-
Field::new("o_shippriority", DataType::Int32, false),
97+
Field::new("o_orderkey", DataType::Int64, true),
98+
Field::new("o_custkey", DataType::Int64, true),
99+
Field::new("o_orderstatus", DataType::Utf8, true),
100+
Field::new("o_totalprice", DataType::Decimal128(15, 2), true),
101+
Field::new("o_orderdate", DataType::Date32, true),
102+
Field::new("o_orderpriority", DataType::Utf8, true),
103+
Field::new("o_clerk", DataType::Utf8, true),
104+
Field::new("o_shippriority", DataType::Int32, true),
105105
Field::new("o_comment", DataType::Utf8, true),
106106
],
107107
"lineitem" => vec![
108-
Field::new("l_orderkey", DataType::Int32, false),
109-
Field::new("l_partkey", DataType::Int32, false),
110-
Field::new("l_suppkey", DataType::Int32, false),
111-
Field::new("l_linenumber", DataType::Int32, false),
112-
Field::new("l_quantity", DataType::Decimal128(15, 2), false),
113-
Field::new("l_extendedprice", DataType::Decimal128(15, 2), false),
114-
Field::new("l_discount", DataType::Decimal128(15, 2), false),
115-
Field::new("l_tax", DataType::Decimal128(15, 2), false),
116-
Field::new("l_returnflag", DataType::Utf8, false),
117-
Field::new("l_linestatus", DataType::Utf8, false),
118-
Field::new("l_shipdate", DataType::Date32, false),
119-
Field::new("l_commitdate", DataType::Date32, false),
120-
Field::new("l_receiptdate", DataType::Date32, false),
121-
Field::new("l_shipinstruct", DataType::Utf8, false),
122-
Field::new("l_shipmode", DataType::Utf8, false),
108+
Field::new("l_orderkey", DataType::Int64, true),
109+
Field::new("l_partkey", DataType::Int64, true),
110+
Field::new("l_suppkey", DataType::Int64, true),
111+
Field::new("l_linenumber", DataType::Int64, true),
112+
Field::new("l_quantity", DataType::Decimal128(15, 2), true),
113+
Field::new("l_extendedprice", DataType::Decimal128(15, 2), true),
114+
Field::new("l_discount", DataType::Decimal128(15, 2), true),
115+
Field::new("l_tax", DataType::Decimal128(15, 2), true),
116+
Field::new("l_returnflag", DataType::Utf8, true),
117+
Field::new("l_linestatus", DataType::Utf8, true),
118+
Field::new("l_shipdate", DataType::Date32, true),
119+
Field::new("l_commitdate", DataType::Date32, true),
120+
Field::new("l_receiptdate", DataType::Date32, true),
121+
Field::new("l_shipinstruct", DataType::Utf8, true),
122+
Field::new("l_shipmode", DataType::Utf8, true),
123123
Field::new("l_comment", DataType::Utf8, true),
124124
],
125125
_ => unreachable!("unknown TPC-H table: {table}"),
@@ -214,13 +214,21 @@ impl TpchDataset {
214214

215215
#[async_trait]
216216
impl Dataset for TpchDataset {
217+
fn create(config: &DatasetConfig) -> anyhow::Result<Arc<dyn Dataset>>
218+
where
219+
Self: Sized + 'static,
220+
{
221+
Ok(Arc::new(Self::new(config)?))
222+
}
223+
217224
fn num_batches(&self, table: &str) -> u64 {
218225
if !TPCH_TABLE_TIME_COLUMNS
219226
.iter()
220227
.any(|(name, _)| *name == table)
221228
{
222229
return 0;
223230
}
231+
224232
// TPC-H produces one batch per table per step.
225233
u64::from(self.num_steps)
226234
}

0 commit comments

Comments
 (0)