Skip to content

Commit b2f5907

Browse files
committed
Clean up partial create rollout launches
1 parent 6d9ec31 commit b2f5907

1 file changed

Lines changed: 21 additions & 7 deletions

File tree

  • rust/otap-dataflow/crates/controller/src/live_control

rust/otap-dataflow/crates/controller/src/live_control/execution.rs

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -251,13 +251,27 @@ impl<PData: 'static + Clone + Send + Sync + std::fmt::Debug + ReceivedAtNode + U
251251
"starting",
252252
None,
253253
);
254-
let deployed_key = self
255-
.launch_regular_pipeline_instance(
256-
&plan.resolved_pipeline,
257-
*core_id,
258-
plan.target_generation,
259-
)
260-
.map_err(|err| RolloutExecutionError::Failed(err.to_string()))?;
254+
let deployed_key = match self.launch_regular_pipeline_instance(
255+
&plan.resolved_pipeline,
256+
*core_id,
257+
plan.target_generation,
258+
) {
259+
Ok(deployed_key) => deployed_key,
260+
Err(err) => {
261+
let reason = err.to_string();
262+
self.update_rollout_core_state(
263+
&plan.pipeline_key,
264+
&plan.rollout.rollout_id,
265+
*core_id,
266+
"failed",
267+
Some(reason.clone()),
268+
);
269+
// Create rollouts have no previous generation to restore, so a launch
270+
// failure must tear down any candidate instances that were already started.
271+
let _ = self.shutdown_instances(&launched, plan.drain_timeout_secs);
272+
return Err(RolloutExecutionError::Failed(reason));
273+
}
274+
};
261275
launched.push(deployed_key);
262276
}
263277

0 commit comments

Comments
 (0)