@@ -268,12 +268,13 @@ def perform_migration(
268268
269269 for dataflow in dataflows :
270270 try :
271+ # Update queue/workpool for all dataflows first
272+ self .update_dataflow_queue (dataflow , new_queue , final_workpool )
273+
271274 # For scheduled pipelines, update pipeline which will handle git steps automatically
275+ # This runs after queue update so the schedule toggle picks up the new queue
272276 if queue_type == "scheduled_pipeline_queue" :
273277 self .update_scheduled_pipeline (dataflow )
274-
275- # Update queue/workpool for all dataflows
276- self .update_dataflow_queue (dataflow , new_queue , final_workpool )
277278 self .stdout .write (f" ✓ Updated dataflow: { dataflow .deployment_name } " )
278279 updated_count += 1
279280 except Exception as e :
@@ -363,6 +364,17 @@ def update_scheduled_pipeline(self, dataflow: OrgDataFlowv1):
363364 # Update pipeline using PipelineService (handles git steps based on workpool)
364365 PipelineService .update_pipeline (dataflow .org , dataflow .deployment_id , update_payload )
365366
367+ # Toggle schedule inactive → active to clear pre-scheduled runs.
368+ # Prefect schedules runs 1-2 days in advance; those won't pick up
369+ # the updated deployment params unless the schedule is reset.
370+ if dataflow .cron and pipeline_details .get ("isScheduleActive" , False ):
371+ PipelineService .set_pipeline_schedule (
372+ dataflow .org , dataflow .deployment_id , "inactive"
373+ )
374+ PipelineService .set_pipeline_schedule (
375+ dataflow .org , dataflow .deployment_id , "active"
376+ )
377+
366378 logger .info (f"Updated scheduled pipeline { dataflow .deployment_name } " )
367379
368380 except Exception as e :
0 commit comments