Skip to content

Commit b8b1ed8

Browse files
fix: Generate proper step counts in data-gen for checkpoint evaluation (#140)
* fix: Generate proper step counts in data-gen * fix v2 * fix table creation over etl cli * chore: auto-fix cargo fmt + clippy * Update tpch.rs --------- Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent 18ebba9 commit b8b1ed8

7 files changed

Lines changed: 232 additions & 136 deletions

File tree

crates/data-generation/src/dataset/tpch.rs

Lines changed: 8 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ limitations under the License.
1515
*/
1616

1717
use std::collections::HashMap;
18-
use std::collections::VecDeque;
1918
use std::sync::atomic::{AtomicI64, AtomicU16, Ordering};
2019
use std::sync::{Arc, Mutex};
2120

@@ -73,34 +72,6 @@ const SF1_ROW_COUNTS: &[(&str, u64)] = &[
7372
("lineitem", 6_001_215),
7473
];
7574

76-
const MIN_TPCH_ROWS_PER_FILE: usize = 32_000;
77-
const MAX_TPCH_ROWS_PER_FILE: usize = 64_000;
78-
const DEFAULT_TPCH_MAX_ROWS_PER_FILE: usize = 48_000;
79-
80-
fn tpch_max_rows_per_file() -> usize {
81-
std::env::var("SPICEBENCH_TPCH_MAX_ROWS_PER_FILE")
82-
.ok()
83-
.and_then(|v| v.parse::<usize>().ok())
84-
.filter(|v| *v > 0)
85-
.map(|v| v.clamp(MIN_TPCH_ROWS_PER_FILE, MAX_TPCH_ROWS_PER_FILE))
86-
.unwrap_or(DEFAULT_TPCH_MAX_ROWS_PER_FILE)
87-
}
88-
89-
fn split_record_batch(batch: RecordBatch, max_rows: usize) -> VecDeque<RecordBatch> {
90-
if batch.num_rows() <= max_rows {
91-
return VecDeque::from([batch]);
92-
}
93-
94-
let mut out = VecDeque::new();
95-
let mut offset = 0usize;
96-
while offset < batch.num_rows() {
97-
let len = std::cmp::min(max_rows, batch.num_rows() - offset);
98-
out.push_back(batch.slice(offset, len));
99-
offset += len;
100-
}
101-
out
102-
}
103-
10475
/// Returns the expected total number of rows for a given table at the
10576
/// specified scale factor.
10677
fn total_rows_for_table(table: &str, scale_factor: f64) -> u64 {
@@ -393,16 +364,12 @@ pub struct TpchDataset {
393364
mutations: MutationConfig,
394365
/// Per-table step counter tracking which part to generate next (0-indexed).
395366
table_steps: HashMap<String, AtomicU16>,
396-
/// Per-table queue of already-generated chunks waiting to be emitted.
397-
pending_batches: HashMap<String, Mutex<VecDeque<RecordBatch>>>,
398367
/// Per-table primary key tracking for update/delete targeting.
399368
key_sets: HashMap<String, Mutex<IndexedKeySet<PrimaryKeyValue>>>,
400369
/// Global monotonically increasing operation counter for replay ordering.
401370
op_counter: AtomicI64,
402371
/// The storage backend for reading/writing table metadata.
403372
storage: Arc<dyn DataStorage>,
404-
/// Maximum number of rows per emitted batch/file.
405-
max_rows_per_file: usize,
406373
}
407374

408375
impl TpchDataset {
@@ -427,25 +394,14 @@ impl TpchDataset {
427394
.map(|(name, _)| (name.to_string(), AtomicU16::new(0)))
428395
.collect();
429396

430-
let pending_batches: HashMap<String, Mutex<VecDeque<RecordBatch>>> = TPCH_TABLES
431-
.iter()
432-
.map(|(name, _)| (name.to_string(), Mutex::new(VecDeque::new())))
433-
.collect();
434-
435-
let max_rows_per_file = tpch_max_rows_per_file();
436-
437-
info!(max_rows_per_file, "Configured TPCH maximum rows per file");
438-
439397
Ok(Self {
440398
scale_factor: config.scale_factor,
441399
num_steps: config.num_steps,
442400
mutations: mutations.clone(),
443401
table_steps,
444-
pending_batches,
445402
key_sets,
446403
op_counter: AtomicI64::new(0),
447404
storage,
448-
max_rows_per_file,
449405
})
450406
}
451407
}
@@ -507,15 +463,6 @@ impl Dataset for TpchDataset {
507463
}
508464

509465
async fn raw_next_batch(&self, table: &str) -> anyhow::Result<Option<RecordBatch>> {
510-
if let Some(queued) = self.pending_batches.get(table) {
511-
let mut queued = queued
512-
.lock()
513-
.map_err(|e| anyhow::anyhow!("lock poisoned: {e}"))?;
514-
if let Some(batch) = queued.pop_front() {
515-
return Ok(Some(batch));
516-
}
517-
}
518-
519466
// Each table independently tracks which step (part) it is on.
520467
let step_counter = self
521468
.table_steps
@@ -655,23 +602,7 @@ impl Dataset for TpchDataset {
655602
let op_indices: Vec<i64> = (op_base..op_base + total_rows as i64).collect();
656603
columns.push(Arc::new(Int64Array::from(op_indices)));
657604

658-
let combined_batch = RecordBatch::try_new(schema, columns)?;
659-
let mut chunks = split_record_batch(combined_batch, self.max_rows_per_file);
660-
661-
let first = chunks
662-
.pop_front()
663-
.ok_or_else(|| anyhow::anyhow!("internal error: no chunks produced"))?;
664-
665-
if !chunks.is_empty()
666-
&& let Some(queued) = self.pending_batches.get(table)
667-
{
668-
let mut queued = queued
669-
.lock()
670-
.map_err(|e| anyhow::anyhow!("lock poisoned: {e}"))?;
671-
queued.extend(chunks);
672-
}
673-
674-
Ok(Some(first))
605+
Ok(Some(RecordBatch::try_new(schema, columns)?))
675606
}
676607

677608
fn tables(&self) -> HashMap<String, DatasetTable> {
@@ -711,6 +642,7 @@ mod tests {
711642
&self,
712643
_table_name: &str,
713644
_batch_id: u64,
645+
_part_id: Option<usize>,
714646
) -> anyhow::Result<Option<ReadResult>> {
715647
Ok(None)
716648
}
@@ -724,6 +656,7 @@ mod tests {
724656
Ok(WriteResult {
725657
rows_written: 0,
726658
bytes_written: 0,
659+
part_ids: Vec::new(),
727660
})
728661
}
729662

@@ -785,7 +718,7 @@ mod tests {
785718
}
786719

787720
#[tokio::test]
788-
async fn tpch_num_batches_is_a_lower_bound_for_emitted_batches_per_table() {
721+
async fn tpch_emits_exactly_one_batch_per_step_for_non_static_tables() {
789722
let dataset = build_dataset(1.0, 7);
790723

791724
for (table, _) in TPCH_TABLES {
@@ -800,33 +733,12 @@ mod tests {
800733
emitted_batches += 1;
801734
}
802735

803-
assert!(
804-
emitted_batches >= dataset.num_batches(table),
805-
"emitted batches should be >= planned batches for table '{table}'"
806-
);
807-
}
808-
}
809-
810-
#[tokio::test]
811-
async fn tpch_batches_are_capped_to_max_rows_per_file() {
812-
let dataset = build_dataset(1.0, 7);
813-
814-
let mut saw_split = false;
815-
while let Some(batch) = dataset
816-
.raw_next_batch("lineitem")
817-
.await
818-
.expect("raw_next_batch should not fail")
819-
{
820-
assert!(
821-
batch.num_rows() <= DEFAULT_TPCH_MAX_ROWS_PER_FILE,
822-
"lineitem chunk exceeded max rows per file"
736+
assert_eq!(
737+
emitted_batches,
738+
dataset.num_batches(table),
739+
"emitted batches should match planned logical batches for table '{table}'"
823740
);
824-
if batch.num_rows() == DEFAULT_TPCH_MAX_ROWS_PER_FILE {
825-
saw_split = true;
826-
}
827741
}
828-
829-
assert!(saw_split, "expected at least one full-size split chunk");
830742
}
831743

832744
#[tokio::test]

crates/data-generation/src/generator.rs

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -80,9 +80,10 @@ impl DataGenerator {
8080
}
8181
});
8282

83-
// Track which batch IDs were successfully written per table so we can
84-
// persist them in the table metadata at the end of the run.
85-
let written_batch_ids: Arc<std::sync::Mutex<HashMap<String, Vec<u64>>>> =
83+
// Track which logical batch IDs were successfully written per table,
84+
// plus any split part IDs for each logical batch, so we can persist
85+
// both in table metadata at the end of the run.
86+
let written_batches: Arc<std::sync::Mutex<HashMap<String, HashMap<u64, Vec<usize>>>>> =
8687
Arc::new(std::sync::Mutex::new(HashMap::new()));
8788

8889
// For each table, spawn a generator task and an uploader task connected
@@ -124,7 +125,7 @@ impl DataGenerator {
124125
// --- Uploader task ---
125126
let target = self.target.clone();
126127
let metrics_up = self.metrics.clone();
127-
let written_ids = Arc::clone(&written_batch_ids);
128+
let written_ids = Arc::clone(&written_batches);
128129
join_set.spawn(async move {
129130
while let Some((batch_id, batch)) = rx.recv().await {
130131
let start = Instant::now();
@@ -133,10 +134,10 @@ impl DataGenerator {
133134
metrics_up.record_write(&result, start.elapsed());
134135
written_ids
135136
.lock()
136-
.expect("written_batch_ids lock poisoned")
137+
.expect("written_batches lock poisoned")
137138
.entry(table_name.clone())
138139
.or_default()
139-
.push(batch_id);
140+
.insert(batch_id, result.part_ids.clone());
140141
}
141142
Err(e) => {
142143
metrics_up.record_error();
@@ -166,15 +167,26 @@ impl DataGenerator {
166167
logger_handle.abort();
167168

168169
// Build and persist the consolidated version metadata (version.json).
169-
let written = Arc::try_unwrap(written_batch_ids)
170+
let written = Arc::try_unwrap(written_batches)
170171
.expect("all tasks should be finished")
171172
.into_inner()
172173
.expect("mutex should not be poisoned");
173174

174175
let dataset_tables = self.dataset.tables();
175176
let mut tables_metadata = HashMap::new();
176-
for (table_name, mut ids) in written {
177+
for (table_name, batch_parts) in written {
178+
let mut ids: Vec<u64> = batch_parts.keys().copied().collect();
177179
ids.sort_unstable();
180+
181+
let mut normalized_batch_parts: HashMap<u64, Vec<usize>> = HashMap::new();
182+
for (batch_id, mut part_ids) in batch_parts {
183+
if part_ids.is_empty() {
184+
continue;
185+
}
186+
part_ids.sort_unstable();
187+
normalized_batch_parts.insert(batch_id, part_ids);
188+
}
189+
178190
let key_columns = self.dataset.primary_key(&table_name);
179191
let dataset_table = dataset_tables.get(&table_name);
180192
let schema_json = dataset_table
@@ -192,6 +204,7 @@ impl DataGenerator {
192204
time_column,
193205
key_columns,
194206
batch_ids: ids.clone(),
207+
batch_parts: normalized_batch_parts,
195208
},
196209
);
197210

@@ -400,6 +413,7 @@ mod tests {
400413
&self,
401414
_table_name: &str,
402415
_batch_id: u64,
416+
_part_id: Option<usize>,
403417
) -> anyhow::Result<Option<ReadResult>> {
404418
Ok(None)
405419
}
@@ -430,6 +444,7 @@ mod tests {
430444
Ok(WriteResult {
431445
rows_written: batch.num_rows() as u64,
432446
bytes_written: 0,
447+
part_ids: Vec::new(),
433448
})
434449
}
435450

crates/data-generation/src/storage/mod.rs

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,31 +39,39 @@ impl ReadResult {
3939
pub struct WriteResult {
4040
pub rows_written: u64,
4141
pub bytes_written: u64,
42+
pub part_ids: Vec<usize>,
4243
}
4344

4445
#[async_trait]
4546
pub trait DataStorage: Send + Sync + 'static {
4647
/// List available batch object paths for a given table.
4748
///
48-
/// Batches are stored under `tables/{table_name}/batch-NNNNNN.parquet`.
49+
/// A logical batch may be stored as either:
50+
/// - `tables/{table_name}/batch-NNNNNN.parquet`, or
51+
/// - one or more split parts like
52+
/// `tables/{table_name}/batch-NNNNNN-part-PPP.parquet`.
4953
async fn list_batches(&self, table_name: &str) -> anyhow::Result<Vec<String>>;
5054

51-
/// Read a single batch from the source by its batch ID and table name.
55+
/// Read a single batch object from the source.
5256
///
5357
/// Returns `Ok(None)` when the batch does not exist in the underlying
5458
/// storage (e.g. the table has fewer batches than others). The caller
5559
/// should treat this as the table having no more data.
5660
///
57-
/// Batches are stored at `tables/{table_name}/batch-{batch_id:06}.parquet`.
61+
/// If `part_id` is `Some(p)`, this reads the split-part object for that
62+
/// logical batch. If `part_id` is `None`, this reads the unsuffixed
63+
/// logical batch object.
5864
async fn read_batch(
5965
&self,
6066
table_name: &str,
6167
batch_id: u64,
68+
part_id: Option<usize>,
6269
) -> anyhow::Result<Option<ReadResult>>;
6370

64-
/// Write a single batch to storage for the given table and batch ID.
71+
/// Write a single logical batch to storage for the given table and batch ID.
6572
///
66-
/// Batches are written to `tables/{table_name}/batch-{batch_id:06}.parquet`.
73+
/// Implementations may split large batches across multiple physical files
74+
/// while preserving the same logical `batch_id`.
6775
async fn write(
6876
&self,
6977
table_name: &str,
@@ -113,6 +121,27 @@ pub trait DataStorage: Send + Sync + 'static {
113121
Ok(VecDeque::new())
114122
}
115123

124+
/// Reads split part IDs for a logical batch from version metadata.
125+
///
126+
/// Returns an empty vector when a batch has no split parts and should be
127+
/// read from the unsuffixed object path.
128+
async fn read_batch_parts(
129+
&self,
130+
table_name: &str,
131+
batch_id: u64,
132+
) -> anyhow::Result<Vec<usize>> {
133+
if let Some(metadata) = self.read_version_metadata().await?
134+
&& let Some(table_meta) = metadata.tables.get(table_name)
135+
&& let Some(part_ids) = table_meta.batch_parts.get(&batch_id)
136+
{
137+
let mut sorted = part_ids.clone();
138+
sorted.sort_unstable();
139+
return Ok(sorted);
140+
}
141+
142+
Ok(Vec::new())
143+
}
144+
116145
fn table_params(&self, table_name: &str) -> HashMap<String, serde_json::Value>;
117146

118147
/// Returns the list of file paths/URIs that would exist after a successful

0 commit comments

Comments
 (0)