Skip to content

Commit 249dcad

Browse files
authored
fix: Read batch IDs from table metadata instead of computing them (#78)
1 parent 5cbdb2a commit 249dcad

12 files changed

Lines changed: 118 additions & 30 deletions

File tree

crates/checkpointer/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ async fn main() -> anyhow::Result<()> {
204204
}
205205

206206
pipeline.initialize().await?;
207-
pipeline.run(cli.checkpoint_interval_steps as usize)?;
207+
pipeline.run(cli.checkpoint_interval_steps as usize).await?;
208208

209209
let mut checkpoint_idx: usize = 0;
210210

crates/data-generation/src/dataset/mod.rs

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use async_trait::async_trait;
2929
use crate::config::DatasetConfig;
3030
use crate::dataset::simple_sequence::SimpleSequenceDataset;
3131
use crate::dataset::tpch::TpchDataset;
32+
use crate::storage::DataStorage;
3233

3334
/// Metadata about a table in a dataset.
3435
#[derive(Debug, Clone)]
@@ -169,17 +170,29 @@ pub trait Dataset: Send + Sync {
169170
fn create(
170171
config: &DatasetConfig,
171172
mutations: &MutationConfig,
173+
storage: Arc<dyn DataStorage>,
172174
) -> anyhow::Result<Arc<dyn Dataset>>
173175
where
174176
Self: Sized + 'static;
175177

176-
/// Returns the batch IDs that would be produced for a given table after a
177-
/// successful generation run.
178+
/// Returns the [`DataStorage`] configured for this dataset.
178179
///
179-
/// The default implementation returns `0..num_batches(table)`, but
180-
/// implementations may override this to customise the ID scheme.
181-
fn batch_ids(&self, table: &str) -> VecDeque<u64> {
182-
(0..self.num_batches(table)).collect()
180+
/// This is used by the default [`batch_ids`] implementation to read
181+
/// batch IDs from the table-level metadata file stored in the backend.
182+
fn storage(self: Arc<Self>) -> Arc<dyn DataStorage>;
183+
184+
/// Returns the batch IDs for a given table by reading the table-level
185+
/// metadata from the configured [`DataStorage`].
186+
///
187+
/// Falls back to `0..num_batches(table)` if the metadata file does not
188+
/// exist or contains no batch entries.
189+
async fn batch_ids(self: Arc<Self>, table: &str) -> VecDeque<u64> {
190+
let num_batches = self.num_batches(table);
191+
let storage = self.storage();
192+
match storage.read_batch_ids(table).await {
193+
Ok(ids) if !ids.is_empty() => ids,
194+
_ => (0..num_batches).collect(),
195+
}
183196
}
184197

185198
/// Returns the total number of batches this dataset will produce for the
@@ -313,21 +326,26 @@ impl Dataset for Arc<dyn Dataset> {
313326
fn create(
314327
config: &DatasetConfig,
315328
mutations: &MutationConfig,
329+
storage: Arc<dyn DataStorage>,
316330
) -> anyhow::Result<Arc<dyn Dataset>>
317331
where
318332
Self: Sized + 'static,
319333
{
320334
match config.dataset_type.as_str() {
321-
"tpch" => TpchDataset::create(config, mutations),
322-
"simple_sequence" => SimpleSequenceDataset::create(config, mutations),
335+
"tpch" => TpchDataset::create(config, mutations, storage),
336+
"simple_sequence" => SimpleSequenceDataset::create(config, mutations, storage),
323337
other => {
324338
anyhow::bail!("Unknown dataset type: {other}. Supported: tpch, simple_sequence")
325339
}
326340
}
327341
}
328342

329-
fn batch_ids(&self, table: &str) -> VecDeque<u64> {
330-
(**self).batch_ids(table)
343+
fn storage(self: Arc<Self>) -> Arc<dyn DataStorage> {
344+
(*self).clone().storage()
345+
}
346+
347+
async fn batch_ids(self: Arc<Self>, table: &str) -> VecDeque<u64> {
348+
(*self).clone().batch_ids(table).await
331349
}
332350

333351
fn num_batches(&self, table: &str) -> u64 {

crates/data-generation/src/dataset/simple_sequence.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use rand::Rng;
2626
use crate::config::DatasetConfig;
2727
use crate::dataset::MutationConfig;
2828
use crate::dataset::key_set::IndexedKeySet;
29+
use crate::storage::DataStorage;
2930

3031
use super::{Dataset, DatasetTable};
3132

@@ -59,10 +60,16 @@ pub struct SimpleSequenceDataset {
5960
key_set: Mutex<IndexedKeySet<i64>>,
6061
/// Global monotonically increasing operation counter for replay ordering.
6162
op_counter: AtomicI64,
63+
/// The storage backend for reading/writing table metadata.
64+
storage: Arc<dyn DataStorage>,
6265
}
6366

6467
impl SimpleSequenceDataset {
65-
pub fn new(config: &DatasetConfig, mutations: &MutationConfig) -> Self {
68+
pub fn new(
69+
config: &DatasetConfig,
70+
mutations: &MutationConfig,
71+
storage: Arc<dyn DataStorage>,
72+
) -> Self {
6673
let batch_size = (config.scale_factor * 1000.0) as usize;
6774
Self {
6875
batch_size,
@@ -72,6 +79,7 @@ impl SimpleSequenceDataset {
7279
remaining_steps: AtomicU16::new(config.num_steps),
7380
key_set: Mutex::new(IndexedKeySet::new()),
7481
op_counter: AtomicI64::new(0),
82+
storage,
7583
}
7684
}
7785

@@ -96,11 +104,16 @@ impl Dataset for SimpleSequenceDataset {
96104
fn create(
97105
config: &DatasetConfig,
98106
mutations: &MutationConfig,
107+
storage: Arc<dyn DataStorage>,
99108
) -> anyhow::Result<Arc<dyn Dataset>>
100109
where
101110
Self: Sized + 'static,
102111
{
103-
Ok(Arc::new(Self::new(config, mutations)))
112+
Ok(Arc::new(Self::new(config, mutations, storage)))
113+
}
114+
115+
fn storage(self: Arc<Self>) -> Arc<dyn DataStorage> {
116+
Arc::clone(&self.storage)
104117
}
105118

106119
fn primary_key(&self, _table: &str) -> Vec<String> {

crates/data-generation/src/dataset/tpch.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ use tracing::info;
3939
use crate::config::DatasetConfig;
4040
use crate::dataset::MutationConfig;
4141
use crate::dataset::key_set::{IndexedKeySet, PrimaryKeyValue};
42+
use crate::storage::DataStorage;
4243

4344
use super::{Dataset, DatasetTable};
4445

@@ -357,10 +358,16 @@ pub struct TpchDataset {
357358
key_sets: HashMap<String, Mutex<IndexedKeySet<PrimaryKeyValue>>>,
358359
/// Global monotonically increasing operation counter for replay ordering.
359360
op_counter: AtomicI64,
361+
/// The storage backend for reading/writing table metadata.
362+
storage: Arc<dyn DataStorage>,
360363
}
361364

362365
impl TpchDataset {
363-
pub fn new(config: &DatasetConfig, mutations: &MutationConfig) -> anyhow::Result<Self> {
366+
pub fn new(
367+
config: &DatasetConfig,
368+
mutations: &MutationConfig,
369+
storage: Arc<dyn DataStorage>,
370+
) -> anyhow::Result<Self> {
364371
info!(
365372
scale_factor = config.scale_factor,
366373
num_steps = config.num_steps,
@@ -384,6 +391,7 @@ impl TpchDataset {
384391
table_steps,
385392
key_sets,
386393
op_counter: AtomicI64::new(0),
394+
storage,
387395
})
388396
}
389397
}
@@ -393,11 +401,16 @@ impl Dataset for TpchDataset {
393401
fn create(
394402
config: &DatasetConfig,
395403
mutations: &MutationConfig,
404+
storage: Arc<dyn DataStorage>,
396405
) -> anyhow::Result<Arc<dyn Dataset>>
397406
where
398407
Self: Sized + 'static,
399408
{
400-
Ok(Arc::new(Self::new(config, mutations)?))
409+
Ok(Arc::new(Self::new(config, mutations, storage)?))
410+
}
411+
412+
fn storage(self: Arc<Self>) -> Arc<dyn DataStorage> {
413+
Arc::clone(&self.storage)
401414
}
402415

403416
fn primary_key(&self, table: &str) -> Vec<String> {

crates/data-generation/src/generator.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ impl DataGenerator {
139139

140140
let mut batch_ids = HashMap::new();
141141
for table in self.dataset.tables().keys() {
142-
batch_ids.insert(table.clone(), self.dataset.batch_ids(table));
142+
batch_ids.insert(table.clone(), self.dataset.clone().batch_ids(table).await);
143143
}
144144

145145
loop {

crates/data-generation/src/main.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,9 +65,11 @@ fn build(args: &CommonArgs) -> anyhow::Result<DataGenerator> {
6565

6666
let mutations_config = MutationConfig::new(0.1, 0.1);
6767

68-
let dataset: Arc<dyn Dataset> = Arc::create(&dataset_config, &mutations_config)?;
69-
7068
let target = Arc::new(S3Storage::new(&target_config)?);
69+
let storage: Arc<dyn DataStorage> = target.clone() as Arc<dyn DataStorage>;
70+
71+
let dataset: Arc<dyn Dataset> = Arc::create(&dataset_config, &mutations_config, storage)?;
72+
7173
let metrics = Metrics::new();
7274

7375
let ingestor = DataGenerator::new(

crates/data-generation/src/storage/mod.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ pub mod s3;
1919
use arrow::array::RecordBatch;
2020
use async_trait::async_trait;
2121
use std::collections::HashMap;
22+
use std::collections::VecDeque;
2223

2324
#[derive(Debug, Clone, PartialEq, Eq)]
2425
pub enum BatchOperation {
@@ -82,4 +83,13 @@ pub trait DataStorage: Send + Sync + 'static {
8283
/// This is a planning method — no I/O is performed. Each implementation
8384
/// maps `(table_name, batch_id)` to its own path scheme (e.g. an S3 URI).
8485
fn expected_files(&self, table_name: &str, batch_ids: &[u64]) -> Vec<String>;
86+
87+
/// Reads the batch IDs recorded in the table-level metadata file.
88+
///
89+
/// Returns the batch IDs in ascending order. If no metadata file exists
90+
/// (or the implementation does not support metadata), the default
91+
/// returns an empty `VecDeque`.
92+
async fn read_batch_ids(&self, _table_name: &str) -> anyhow::Result<VecDeque<u64>> {
93+
Ok(VecDeque::new())
94+
}
8595
}

crates/data-generation/src/storage/s3.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
3232
use parquet::basic::Compression;
3333
use parquet::file::properties::WriterProperties;
3434
use std::collections::HashMap;
35+
use std::collections::VecDeque;
3536

3637
use super::{BatchOperation, ReadResult, WriteResult};
3738

@@ -335,6 +336,29 @@ impl DataStorage for S3Storage {
335336
Ok(paths)
336337
}
337338

339+
async fn read_batch_ids(&self, table_name: &str) -> anyhow::Result<VecDeque<u64>> {
340+
let metadata_path = self.table_metadata_object_path(table_name);
341+
let get_result = match self.store.get(&metadata_path).await {
342+
Ok(r) => r,
343+
Err(object_store::Error::NotFound { .. }) => return Ok(VecDeque::new()),
344+
Err(e) => return Err(e.into()),
345+
};
346+
347+
let bytes = get_result.bytes().await?;
348+
let table_meta: serde_json::Value = serde_json::from_slice(&bytes)?;
349+
350+
let Some(batches) = table_meta.get("batches").and_then(|b| b.as_object()) else {
351+
return Ok(VecDeque::new());
352+
};
353+
354+
let mut ids: Vec<u64> = batches
355+
.keys()
356+
.filter_map(|k| k.parse::<u64>().ok())
357+
.collect();
358+
ids.sort_unstable();
359+
Ok(VecDeque::from(ids))
360+
}
361+
338362
async fn read_batch(
339363
&self,
340364
table_name: &str,

crates/etl/src/lib.rs

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -124,10 +124,13 @@ impl DatasetSource {
124124
&self,
125125
config: &GenerationDatasetConfig,
126126
mutations: &MutationConfig,
127+
storage: Arc<dyn DataStorage>,
127128
) -> anyhow::Result<Arc<dyn Dataset>> {
128129
match self {
129-
DatasetSource::SimpleSequence => SimpleSequenceDataset::create(config, mutations),
130-
DatasetSource::Tpch => TpchDataset::create(config, mutations),
130+
DatasetSource::SimpleSequence => {
131+
SimpleSequenceDataset::create(config, mutations, storage)
132+
}
133+
DatasetSource::Tpch => TpchDataset::create(config, mutations, storage),
131134
}
132135
}
133136
}
@@ -224,7 +227,7 @@ impl ETLPipeline {
224227
data_sink: Arc<dyn Sink>,
225228
mutations: &MutationConfig,
226229
) -> anyhow::Result<Self> {
227-
let dataset = dataset_source.create(config, mutations)?;
230+
let dataset = dataset_source.create(config, mutations, Arc::clone(&data_storage))?;
228231
let (state_tx, state_rx) = watch::channel(PipelineState::NotStarted);
229232
Ok(Self {
230233
dataset_source,
@@ -399,7 +402,7 @@ impl ETLPipeline {
399402
/// processed, the [`CancellationToken`] is triggered, or an error occurs.
400403
///
401404
/// Returns an error if the pipeline is not in the [`Initialized`] state.
402-
pub fn start(&mut self) -> anyhow::Result<()> {
405+
pub async fn start(&mut self) -> anyhow::Result<()> {
403406
let current_state = self.state_rx.borrow().clone();
404407
if current_state != PipelineState::Initialized {
405408
anyhow::bail!(
@@ -409,7 +412,7 @@ impl ETLPipeline {
409412
}
410413

411414
self.batch_budget = None;
412-
self.build_work_plan();
415+
self.build_work_plan().await;
413416
self.spawn_run_task(None);
414417
Ok(())
415418
}
@@ -427,7 +430,7 @@ impl ETLPipeline {
427430
/// [`PipelineState::Stopped(StopReason::Completed)`].
428431
///
429432
/// Returns an error if the pipeline is not in the [`Initialized`] state.
430-
pub fn run(&mut self, step_count: usize) -> anyhow::Result<()> {
433+
pub async fn run(&mut self, step_count: usize) -> anyhow::Result<()> {
431434
let current_state = self.state_rx.borrow().clone();
432435
if current_state != PipelineState::Initialized {
433436
anyhow::bail!(
@@ -437,7 +440,7 @@ impl ETLPipeline {
437440
}
438441

439442
self.batch_budget = Some(step_count);
440-
self.build_work_plan();
443+
self.build_work_plan().await;
441444
self.spawn_run_task(Some(step_count));
442445
Ok(())
443446
}
@@ -473,13 +476,13 @@ impl ETLPipeline {
473476

474477
/// Build the initial work plan from the dataset and store it in
475478
/// `self.work_state`.
476-
fn build_work_plan(&self) {
479+
async fn build_work_plan(&self) {
477480
let dataset = &self.dataset;
478481
let tables = dataset.tables();
479482
let mut steps: BTreeMap<u64, Vec<String>> = BTreeMap::new();
480483

481484
for name in tables.keys() {
482-
for id in dataset.batch_ids(name) {
485+
for id in dataset.clone().batch_ids(name).await {
483486
// Skip batch 0 — it was already processed during initialize().
484487
if id == 0 {
485488
continue;

crates/etl/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ async fn main() -> anyhow::Result<()> {
142142
}
143143

144144
pipeline.initialize().await?;
145-
pipeline.start()?;
145+
pipeline.start().await?;
146146

147147
let final_state = pipeline.wait().await;
148148
match &final_state {

0 commit comments

Comments
 (0)