Skip to content

Commit 0b20750

Browse files
fix: Remove unused ingestor code, rename to DataGenerator (#59)
* fix: Remove unused ingestor code * fix: Rename ingestor to generator * chore: auto-fix cargo fmt + clippy * Update generator.rs --------- Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent 5d54b60 commit 0b20750

3 files changed

Lines changed: 12 additions & 44 deletions

File tree

Lines changed: 3 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,14 @@ use super::dataset::Dataset;
2626
use super::metrics::{IngestResult, Metrics};
2727
use super::target::Target;
2828

29-
pub struct Ingestor {
29+
pub struct DataGenerator {
3030
dataset: Arc<dyn Dataset>,
3131
target: Arc<dyn Target>,
3232
metrics: Metrics,
3333
semaphore: Arc<Semaphore>,
3434
}
3535

36-
impl Ingestor {
36+
impl DataGenerator {
3737
pub fn new(
3838
dataset: Arc<dyn Dataset>,
3939
target: Arc<dyn Target>,
@@ -52,38 +52,7 @@ impl Ingestor {
5252
///
5353
/// Pulls one batch per table from the dataset using `next_batches()`, then writes
5454
/// them sequentially so the data is guaranteed to be present when this returns.
55-
///
56-
/// If `table_location_fn` is provided, prints a JSON object mapping each table to
57-
/// its connector and location, e.g.:
58-
/// `{"customer": {"connector": "s3", "location": "s3://bucket/prefix/customer/"}, ...}`
59-
pub async fn initialize(
60-
&self,
61-
table_location_fn: Option<&dyn Fn(&str) -> String>,
62-
) -> anyhow::Result<IngestResult> {
63-
// Print table locations as JSON
64-
if let Some(loc_fn) = table_location_fn {
65-
let mut map = serde_json::Map::new();
66-
for (name, table) in self.dataset.tables() {
67-
let mut entry = serde_json::Map::new();
68-
entry.insert(
69-
"connector".to_string(),
70-
serde_json::Value::String("s3".to_string()),
71-
);
72-
entry.insert(
73-
"location".to_string(),
74-
serde_json::Value::String(loc_fn(&name)),
75-
);
76-
if let Some(ref time_col) = table.time_column {
77-
entry.insert(
78-
"time_column".to_string(),
79-
serde_json::Value::String(time_col.clone()),
80-
);
81-
}
82-
map.insert(name, serde_json::Value::Object(entry));
83-
}
84-
println!("{}", serde_json::Value::Object(map));
85-
}
86-
55+
pub async fn initialize(&self) -> anyhow::Result<IngestResult> {
8756
let table_count = self.dataset.tables().len();
8857

8958
tracing::info!(

crates/data-generation/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ limitations under the License.
1616

1717
pub mod config;
1818
pub mod dataset;
19-
pub mod ingestor;
19+
pub mod generator;
2020
pub mod metrics;
2121
pub mod source;
2222
pub mod target;

crates/data-generation/src/main.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use std::sync::Arc;
2323
use data_generation::config::{Cli, Command, CommonArgs};
2424
use data_generation::dataset;
2525
use data_generation::dataset::tpch::TpchDataset;
26-
use data_generation::ingestor::Ingestor;
26+
use data_generation::generator::DataGenerator;
2727
use data_generation::metrics::{IngestResult, Metrics};
2828
use data_generation::target::s3::S3Target;
2929

@@ -50,7 +50,7 @@ fn print_summary(result: &IngestResult) {
5050
println!(" Avg write latency: {:?}", result.avg_write_latency);
5151
}
5252

53-
fn build(args: &CommonArgs) -> anyhow::Result<(Ingestor, Arc<S3Target>)> {
53+
fn build(args: &CommonArgs) -> anyhow::Result<DataGenerator> {
5454
let dataset_config = args.dataset_config();
5555
let target_config = args.target_config();
5656
let ingestor_config = args.ingestor_config();
@@ -72,13 +72,13 @@ fn build(args: &CommonArgs) -> anyhow::Result<(Ingestor, Arc<S3Target>)> {
7272
let target = Arc::new(S3Target::new(&target_config)?);
7373
let metrics = Metrics::new();
7474

75-
let ingestor = Ingestor::new(
75+
let ingestor = DataGenerator::new(
7676
dataset,
77-
Arc::clone(&target) as Arc<dyn Target>,
77+
target as Arc<dyn Target>,
7878
&ingestor_config,
7979
metrics,
8080
);
81-
Ok((ingestor, target))
81+
Ok(ingestor)
8282
}
8383

8484
#[tokio::main]
@@ -91,16 +91,15 @@ async fn main() -> anyhow::Result<()> {
9191

9292
match cli.command {
9393
Command::Initialize(args) => {
94-
let (ingestor, target) = build(&args)?;
95-
let loc_fn = |table: &str| target.table_s3_path(table);
96-
let result = ingestor.initialize(Some(&loc_fn)).await?;
94+
let ingestor = build(&args)?;
95+
let result = ingestor.initialize().await?;
9796

9897
if result.write_errors > 0 {
9998
anyhow::bail!("Initialization failed with {} errors", result.write_errors);
10099
}
101100
}
102101
Command::Run(run_args) => {
103-
let (ingestor, _target) = build(&run_args.common)?;
102+
let ingestor = build(&run_args.common)?;
104103
if run_args.skip_initial {
105104
ingestor.skip_initial_batches().await?;
106105
}

0 commit comments

Comments
 (0)