Skip to content

Commit 5f0ab16

Browse files
authored
fix: ETL run supports not initializing first (#143)
1 parent 8e598af commit 5f0ab16

1 file changed

Lines changed: 9 additions & 9 deletions

File tree

crates/etl/src/lib.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1195,14 +1195,6 @@ impl ETLPipeline {
11951195
///
11961196
/// Returns an error if the pipeline is not in the [`Initialized`] state.
11971197
pub async fn run(&mut self, step_count: usize) -> anyhow::Result<()> {
1198-
let current_state = self.state_rx.borrow().clone();
1199-
if current_state != PipelineState::Initialized {
1200-
anyhow::bail!(
1201-
"Cannot run pipeline: current state is {:?} (must be Initialized)",
1202-
current_state
1203-
);
1204-
}
1205-
12061198
self.batch_budget = Some(step_count);
12071199
self.build_work_plan().await;
12081200
self.spawn_run_task(Some(step_count));
@@ -1248,9 +1240,17 @@ impl ETLPipeline {
12481240
let tables = dataset.tables();
12491241
let mut steps: BTreeMap<u64, Vec<String>> = BTreeMap::new();
12501242

1243+
// Only skip the first batch ID per table if initialize() was called
1244+
let skip_first = *self.state_rx.borrow() == PipelineState::Initialized;
1245+
12511246
for name in tables.keys() {
12521247
let ids = dataset.clone().batch_ids(name).await;
1253-
let initialized_id = ids.front().copied();
1248+
let initialized_id = if skip_first {
1249+
ids.front().copied()
1250+
} else {
1251+
None
1252+
};
1253+
12541254
let mut seen_ids = HashSet::new();
12551255

12561256
for id in ids {

0 commit comments

Comments
 (0)