Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/checkpointer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ async fn main() -> anyhow::Result<()> {
}

pipeline.initialize().await?;
pipeline.run(cli.checkpoint_interval_steps as usize)?;
pipeline.run(cli.checkpoint_interval_steps as usize).await?;

let mut checkpoint_idx: usize = 0;

Expand Down
38 changes: 28 additions & 10 deletions crates/data-generation/src/dataset/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use async_trait::async_trait;
use crate::config::DatasetConfig;
use crate::dataset::simple_sequence::SimpleSequenceDataset;
use crate::dataset::tpch::TpchDataset;
use crate::storage::DataStorage;

/// Metadata about a table in a dataset.
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -169,17 +170,29 @@ pub trait Dataset: Send + Sync {
fn create(
config: &DatasetConfig,
mutations: &MutationConfig,
storage: Arc<dyn DataStorage>,
) -> anyhow::Result<Arc<dyn Dataset>>
where
Self: Sized + 'static;

/// Returns the batch IDs that would be produced for a given table after a
/// successful generation run.
/// Returns the [`DataStorage`] configured for this dataset.
///
/// The default implementation returns `0..num_batches(table)`, but
/// implementations may override this to customise the ID scheme.
fn batch_ids(&self, table: &str) -> VecDeque<u64> {
(0..self.num_batches(table)).collect()
/// This is used by the default [`batch_ids`] implementation to read
/// batch IDs from the table-level metadata file stored in the backend.
fn storage(self: Arc<Self>) -> Arc<dyn DataStorage>;

/// Returns the batch IDs for a given table by reading the table-level
/// metadata from the configured [`DataStorage`].
///
/// Falls back to `0..num_batches(table)` if the metadata file does not
/// exist or contains no batch entries.
async fn batch_ids(self: Arc<Self>, table: &str) -> VecDeque<u64> {
let num_batches = self.num_batches(table);
let storage = self.storage();
match storage.read_batch_ids(table).await {
Ok(ids) if !ids.is_empty() => ids,
_ => (0..num_batches).collect(),
}
}

/// Returns the total number of batches this dataset will produce for the
Expand Down Expand Up @@ -313,21 +326,26 @@ impl Dataset for Arc<dyn Dataset> {
fn create(
config: &DatasetConfig,
mutations: &MutationConfig,
storage: Arc<dyn DataStorage>,
) -> anyhow::Result<Arc<dyn Dataset>>
where
Self: Sized + 'static,
{
match config.dataset_type.as_str() {
"tpch" => TpchDataset::create(config, mutations),
"simple_sequence" => SimpleSequenceDataset::create(config, mutations),
"tpch" => TpchDataset::create(config, mutations, storage),
"simple_sequence" => SimpleSequenceDataset::create(config, mutations, storage),
other => {
anyhow::bail!("Unknown dataset type: {other}. Supported: tpch, simple_sequence")
}
}
}

fn batch_ids(&self, table: &str) -> VecDeque<u64> {
(**self).batch_ids(table)
fn storage(self: Arc<Self>) -> Arc<dyn DataStorage> {
(*self).clone().storage()
}

async fn batch_ids(self: Arc<Self>, table: &str) -> VecDeque<u64> {
(*self).clone().batch_ids(table).await
}

fn num_batches(&self, table: &str) -> u64 {
Expand Down
17 changes: 15 additions & 2 deletions crates/data-generation/src/dataset/simple_sequence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use rand::Rng;
use crate::config::DatasetConfig;
use crate::dataset::MutationConfig;
use crate::dataset::key_set::IndexedKeySet;
use crate::storage::DataStorage;

use super::{Dataset, DatasetTable};

Expand Down Expand Up @@ -59,10 +60,16 @@ pub struct SimpleSequenceDataset {
key_set: Mutex<IndexedKeySet<i64>>,
/// Global monotonically increasing operation counter for replay ordering.
op_counter: AtomicI64,
/// The storage backend for reading/writing table metadata.
storage: Arc<dyn DataStorage>,
}

impl SimpleSequenceDataset {
pub fn new(config: &DatasetConfig, mutations: &MutationConfig) -> Self {
pub fn new(
config: &DatasetConfig,
mutations: &MutationConfig,
storage: Arc<dyn DataStorage>,
) -> Self {
let batch_size = (config.scale_factor * 1000.0) as usize;
Self {
batch_size,
Expand All @@ -72,6 +79,7 @@ impl SimpleSequenceDataset {
remaining_steps: AtomicU16::new(config.num_steps),
key_set: Mutex::new(IndexedKeySet::new()),
op_counter: AtomicI64::new(0),
storage,
}
}

Expand All @@ -96,11 +104,16 @@ impl Dataset for SimpleSequenceDataset {
fn create(
config: &DatasetConfig,
mutations: &MutationConfig,
storage: Arc<dyn DataStorage>,
) -> anyhow::Result<Arc<dyn Dataset>>
where
Self: Sized + 'static,
{
Ok(Arc::new(Self::new(config, mutations)))
Ok(Arc::new(Self::new(config, mutations, storage)))
}

fn storage(self: Arc<Self>) -> Arc<dyn DataStorage> {
Arc::clone(&self.storage)
}

fn primary_key(&self, _table: &str) -> Vec<String> {
Expand Down
17 changes: 15 additions & 2 deletions crates/data-generation/src/dataset/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use tracing::info;
use crate::config::DatasetConfig;
use crate::dataset::MutationConfig;
use crate::dataset::key_set::{IndexedKeySet, PrimaryKeyValue};
use crate::storage::DataStorage;

use super::{Dataset, DatasetTable};

Expand Down Expand Up @@ -357,10 +358,16 @@ pub struct TpchDataset {
key_sets: HashMap<String, Mutex<IndexedKeySet<PrimaryKeyValue>>>,
/// Global monotonically increasing operation counter for replay ordering.
op_counter: AtomicI64,
/// The storage backend for reading/writing table metadata.
storage: Arc<dyn DataStorage>,
}

impl TpchDataset {
pub fn new(config: &DatasetConfig, mutations: &MutationConfig) -> anyhow::Result<Self> {
pub fn new(
config: &DatasetConfig,
mutations: &MutationConfig,
storage: Arc<dyn DataStorage>,
) -> anyhow::Result<Self> {
info!(
scale_factor = config.scale_factor,
num_steps = config.num_steps,
Expand All @@ -384,6 +391,7 @@ impl TpchDataset {
table_steps,
key_sets,
op_counter: AtomicI64::new(0),
storage,
})
}
}
Expand All @@ -393,11 +401,16 @@ impl Dataset for TpchDataset {
fn create(
config: &DatasetConfig,
mutations: &MutationConfig,
storage: Arc<dyn DataStorage>,
) -> anyhow::Result<Arc<dyn Dataset>>
where
Self: Sized + 'static,
{
Ok(Arc::new(Self::new(config, mutations)?))
Ok(Arc::new(Self::new(config, mutations, storage)?))
}

fn storage(self: Arc<Self>) -> Arc<dyn DataStorage> {
Arc::clone(&self.storage)
}

fn primary_key(&self, table: &str) -> Vec<String> {
Expand Down
2 changes: 1 addition & 1 deletion crates/data-generation/src/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ impl DataGenerator {

let mut batch_ids = HashMap::new();
for table in self.dataset.tables().keys() {
batch_ids.insert(table.clone(), self.dataset.batch_ids(table));
batch_ids.insert(table.clone(), self.dataset.clone().batch_ids(table).await);
}

loop {
Expand Down
6 changes: 4 additions & 2 deletions crates/data-generation/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,11 @@ fn build(args: &CommonArgs) -> anyhow::Result<DataGenerator> {

let mutations_config = MutationConfig::new(0.1, 0.1);

let dataset: Arc<dyn Dataset> = Arc::create(&dataset_config, &mutations_config)?;

let target = Arc::new(S3Storage::new(&target_config)?);
let storage: Arc<dyn DataStorage> = target.clone() as Arc<dyn DataStorage>;

let dataset: Arc<dyn Dataset> = Arc::create(&dataset_config, &mutations_config, storage)?;

let metrics = Metrics::new();

let ingestor = DataGenerator::new(
Expand Down
10 changes: 10 additions & 0 deletions crates/data-generation/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub mod s3;
use arrow::array::RecordBatch;
use async_trait::async_trait;
use std::collections::HashMap;
use std::collections::VecDeque;

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum BatchOperation {
Expand Down Expand Up @@ -82,4 +83,13 @@ pub trait DataStorage: Send + Sync + 'static {
/// This is a planning method — no I/O is performed. Each implementation
/// maps `(table_name, batch_id)` to its own path scheme (e.g. an S3 URI).
fn expected_files(&self, table_name: &str, batch_ids: &[u64]) -> Vec<String>;

/// Reads the batch IDs recorded in the table-level metadata file.
///
/// Returns the batch IDs in ascending order. If no metadata file exists
/// (or the implementation does not support metadata), the default
/// returns an empty `VecDeque`.
async fn read_batch_ids(&self, _table_name: &str) -> anyhow::Result<VecDeque<u64>> {
Ok(VecDeque::new())
}
}
24 changes: 24 additions & 0 deletions crates/data-generation/src/storage/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use parquet::basic::Compression;
use parquet::file::properties::WriterProperties;
use std::collections::HashMap;
use std::collections::VecDeque;

use super::{BatchOperation, ReadResult, WriteResult};

Expand Down Expand Up @@ -335,6 +336,29 @@ impl DataStorage for S3Storage {
Ok(paths)
}

async fn read_batch_ids(&self, table_name: &str) -> anyhow::Result<VecDeque<u64>> {
let metadata_path = self.table_metadata_object_path(table_name);
let get_result = match self.store.get(&metadata_path).await {
Ok(r) => r,
Err(object_store::Error::NotFound { .. }) => return Ok(VecDeque::new()),
Err(e) => return Err(e.into()),
};

let bytes = get_result.bytes().await?;
let table_meta: serde_json::Value = serde_json::from_slice(&bytes)?;

let Some(batches) = table_meta.get("batches").and_then(|b| b.as_object()) else {
return Ok(VecDeque::new());
};

let mut ids: Vec<u64> = batches
.keys()
.filter_map(|k| k.parse::<u64>().ok())
.collect();
ids.sort_unstable();
Ok(VecDeque::from(ids))
}

async fn read_batch(
&self,
table_name: &str,
Expand Down
21 changes: 12 additions & 9 deletions crates/etl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,13 @@ impl DatasetSource {
&self,
config: &GenerationDatasetConfig,
mutations: &MutationConfig,
storage: Arc<dyn DataStorage>,
) -> anyhow::Result<Arc<dyn Dataset>> {
match self {
DatasetSource::SimpleSequence => SimpleSequenceDataset::create(config, mutations),
DatasetSource::Tpch => TpchDataset::create(config, mutations),
DatasetSource::SimpleSequence => {
SimpleSequenceDataset::create(config, mutations, storage)
}
DatasetSource::Tpch => TpchDataset::create(config, mutations, storage),
}
}
}
Expand Down Expand Up @@ -224,7 +227,7 @@ impl ETLPipeline {
data_sink: Arc<dyn Sink>,
mutations: &MutationConfig,
) -> anyhow::Result<Self> {
let dataset = dataset_source.create(config, mutations)?;
let dataset = dataset_source.create(config, mutations, Arc::clone(&data_storage))?;
let (state_tx, state_rx) = watch::channel(PipelineState::NotStarted);
Ok(Self {
dataset_source,
Expand Down Expand Up @@ -399,7 +402,7 @@ impl ETLPipeline {
/// processed, the [`CancellationToken`] is triggered, or an error occurs.
///
/// Returns an error if the pipeline is not in the [`Initialized`] state.
pub fn start(&mut self) -> anyhow::Result<()> {
pub async fn start(&mut self) -> anyhow::Result<()> {
let current_state = self.state_rx.borrow().clone();
if current_state != PipelineState::Initialized {
anyhow::bail!(
Expand All @@ -409,7 +412,7 @@ impl ETLPipeline {
}

self.batch_budget = None;
self.build_work_plan();
self.build_work_plan().await;
self.spawn_run_task(None);
Ok(())
}
Expand All @@ -427,7 +430,7 @@ impl ETLPipeline {
/// [`PipelineState::Stopped(StopReason::Completed)`].
///
/// Returns an error if the pipeline is not in the [`Initialized`] state.
pub fn run(&mut self, step_count: usize) -> anyhow::Result<()> {
pub async fn run(&mut self, step_count: usize) -> anyhow::Result<()> {
let current_state = self.state_rx.borrow().clone();
if current_state != PipelineState::Initialized {
anyhow::bail!(
Expand All @@ -437,7 +440,7 @@ impl ETLPipeline {
}

self.batch_budget = Some(step_count);
self.build_work_plan();
self.build_work_plan().await;
self.spawn_run_task(Some(step_count));
Ok(())
}
Expand Down Expand Up @@ -473,13 +476,13 @@ impl ETLPipeline {

/// Build the initial work plan from the dataset and store it in
/// `self.work_state`.
fn build_work_plan(&self) {
async fn build_work_plan(&self) {
let dataset = &self.dataset;
let tables = dataset.tables();
let mut steps: BTreeMap<u64, Vec<String>> = BTreeMap::new();

for name in tables.keys() {
for id in dataset.batch_ids(name) {
for id in dataset.clone().batch_ids(name).await {
// Skip batch 0 — it was already processed during initialize().
if id == 0 {
continue;
Expand Down
2 changes: 1 addition & 1 deletion crates/etl/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ async fn main() -> anyhow::Result<()> {
}

pipeline.initialize().await?;
pipeline.start()?;
pipeline.start().await?;

let final_state = pipeline.wait().await;
match &final_state {
Expand Down
2 changes: 1 addition & 1 deletion src/commands/load/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ pub(crate) async fn run(
// --- Start the ETL pipeline (remaining batches) ---
tracing::info!("Starting ETL pipeline (remaining batches)...");
let mut etl_state_rx = etl_pipeline.state_watch();
etl_pipeline.start()?;
etl_pipeline.start().await?;

let test_future = throughput_test.wait();
tokio::pin!(test_future);
Expand Down
7 changes: 6 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use clap::Parser;
use data_generation::config::{DatasetConfig as GenerationDatasetConfig, TargetConfig};
use data_generation::dataset::Dataset;
use data_generation::dataset::MutationConfig;
use data_generation::storage::DataStorage;
use data_generation::storage::s3::S3Storage;
use etl::sink::adbc::AdbcSink;
use etl::{DatasetSource, ETLPipeline, PipelineState, StopReason};
Expand Down Expand Up @@ -121,7 +122,11 @@ async fn main() -> anyhow::Result<()> {
let run_id = uuid::Uuid::new_v4();
let mutations = MutationConfig::new(0.1, 0.1);

let setup_dataset = dataset_source.create(&generation_config, &mutations)?;
let setup_dataset = dataset_source.create(
&generation_config,
&mutations,
Arc::clone(&source) as Arc<dyn DataStorage>,
)?;
let datasets = create_tables_request_datasets(&setup_dataset);

let setup_metadata = std::collections::HashMap::from([
Expand Down