Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions src/prefect/server/orchestration/core_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ def priority() -> list[
]:
return [
EnsureOnlyScheduledFlowsMarkedLate,
EnforceDeploymentConcurrencyOnLate,
InstrumentFlowRunStateTransitions,
]

Expand Down Expand Up @@ -1707,6 +1708,69 @@ async def before_transition(
)


class EnforceDeploymentConcurrencyOnLate(FlowRunOrchestrationRule):
"""Enforce the CANCEL_NEW deployment concurrency strategy when marking runs late.

When a flow run would be marked Late and its deployment uses the CANCEL_NEW
collision strategy with a fully occupied concurrency limit, this rule rejects
the Late transition and replaces it with a Cancelled state.

This closes the gap where CANCEL_NEW is normally enforced at the * -> PENDING
transition (by SecureFlowConcurrencySlots), but runs that never reach PENDING
because they go late would accumulate in a Late state instead of being cancelled.
"""

FROM_STATES = {StateType.SCHEDULED}
TO_STATES = {StateType.SCHEDULED}

async def before_transition(
self,
initial_state: states.State[Any] | None,
proposed_state: states.State[Any] | None,
context: OrchestrationContext[orm_models.FlowRun, core.FlowRunPolicy],
) -> None:
if initial_state is None or proposed_state is None:
return

if not (proposed_state.is_scheduled() and proposed_state.name == "Late"):
return

if not context.run.deployment_id:
return

deployment = await deployments.read_deployment(
session=context.session,
deployment_id=context.run.deployment_id,
)
if not deployment or not deployment.concurrency_limit_id:
return

concurrency_options = deployment.concurrency_options
if isinstance(concurrency_options, dict):
concurrency_options = core.ConcurrencyOptions.model_validate(
concurrency_options
)
if (
not concurrency_options
or concurrency_options.collision_strategy
!= core.ConcurrencyLimitStrategy.CANCEL_NEW
):
return

limit = deployment.global_concurrency_limit
if not limit:
return

if limit.active_slots >= limit.limit:
await self.reject_transition(
state=states.Cancelled(message="Deployment concurrency limit reached."),
reason=(
"Deployment concurrency limit is full and uses the"
" CANCEL_NEW strategy."
),
)


class PreventRunningTasksFromStoppedFlows(TaskRunOrchestrationRule):
"""
Prevents running tasks from stopped flows.
Expand Down
160 changes: 160 additions & 0 deletions tests/server/services/test_late_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,163 @@ async def test_monitor_query_filters_already_late_runs(session, late_run, db):
# The monitor query would not select this run again because:
# - state_name is now "Late", not "Scheduled"
# This is tested by verifying the query filter behavior, not by calling the task twice


async def test_cancels_late_run_when_deployment_has_cancel_new_and_limit_is_full(
session, flow, db
):
"""When a deployment uses CANCEL_NEW and its concurrency limit is full,
mark_flow_run_late should cancel the run instead of marking it Late.

Regression test for https://github.com/PrefectHQ/prefect/issues/21060
"""
# create a deployment with CANCEL_NEW, limit=1
deployment = await models.deployments.create_deployment(
session=session,
deployment=schemas.core.Deployment(
name=f"cancel-new-deployment-{uuid4()}",
flow_id=flow.id,
concurrency_limit=1,
concurrency_options={
"collision_strategy": schemas.core.ConcurrencyLimitStrategy.CANCEL_NEW,
},
),
)
await session.flush()

# create a RUNNING flow run to occupy the concurrency slot
await models.flow_runs.create_flow_run(
session=session,
flow_run=schemas.core.FlowRun(
flow_id=flow.id,
deployment_id=deployment.id,
state=schemas.states.Running(),
),
)
# actually acquire the concurrency slot for the running run
if deployment.concurrency_limit_id:
await models.concurrency_limits_v2.bulk_increment_active_slots(
session=session,
concurrency_limit_ids=[deployment.concurrency_limit_id],
slots=1,
)

# create a scheduled run that's past due (would be marked Late)
late_candidate = await models.flow_runs.create_flow_run(
session=session,
flow_run=schemas.core.FlowRun(
flow_id=flow.id,
deployment_id=deployment.id,
state=schemas.states.Scheduled(
scheduled_time=datetime.now(timezone.utc) - timedelta(minutes=1)
),
),
)
await session.commit()

assert late_candidate.state.name == "Scheduled"

await mark_flow_run_late(late_candidate.id, db=db)

await session.refresh(late_candidate)
# The run should be cancelled, not merely marked Late
assert late_candidate.state_type == schemas.states.StateType.CANCELLED, (
f"Expected CANCELLED but got {late_candidate.state_type}/{late_candidate.state_name}. "
"CANCEL_NEW should proactively cancel runs that would go Late when the "
"deployment concurrency limit is full."
)


async def test_marks_late_not_cancelled_when_cancel_new_but_limit_not_full(
session, flow, db
):
"""When a deployment uses CANCEL_NEW but the concurrency limit has
available slots, runs should be marked Late normally (not cancelled).

Regression test for https://github.com/PrefectHQ/prefect/issues/21060
"""
deployment = await models.deployments.create_deployment(
session=session,
deployment=schemas.core.Deployment(
name=f"cancel-new-available-{uuid4()}",
flow_id=flow.id,
concurrency_limit=1,
concurrency_options={
"collision_strategy": schemas.core.ConcurrencyLimitStrategy.CANCEL_NEW,
},
),
)
await session.flush()

# no running flow runs — the concurrency slot is available
late_candidate = await models.flow_runs.create_flow_run(
session=session,
flow_run=schemas.core.FlowRun(
flow_id=flow.id,
deployment_id=deployment.id,
state=schemas.states.Scheduled(
scheduled_time=datetime.now(timezone.utc) - timedelta(minutes=1)
),
),
)
await session.commit()

await mark_flow_run_late(late_candidate.id, db=db)

await session.refresh(late_candidate)
# Slot is available, so the run should be marked Late (not cancelled)
assert late_candidate.state_name == "Late"


async def test_marks_late_when_deployment_has_enqueue_strategy(session, flow, db):
"""Runs on ENQUEUE deployments should always be marked Late, even when
the concurrency limit is full.

Regression test for https://github.com/PrefectHQ/prefect/issues/21060
"""
deployment = await models.deployments.create_deployment(
session=session,
deployment=schemas.core.Deployment(
name=f"enqueue-deployment-{uuid4()}",
flow_id=flow.id,
concurrency_limit=1,
concurrency_options={
"collision_strategy": schemas.core.ConcurrencyLimitStrategy.ENQUEUE,
},
),
)
await session.flush()

# occupy the concurrency slot
await models.flow_runs.create_flow_run(
session=session,
flow_run=schemas.core.FlowRun(
flow_id=flow.id,
deployment_id=deployment.id,
state=schemas.states.Running(),
),
)
if deployment.concurrency_limit_id:
await models.concurrency_limits_v2.bulk_increment_active_slots(
session=session,
concurrency_limit_ids=[deployment.concurrency_limit_id],
slots=1,
)

late_candidate = await models.flow_runs.create_flow_run(
session=session,
flow_run=schemas.core.FlowRun(
flow_id=flow.id,
deployment_id=deployment.id,
state=schemas.states.Scheduled(
scheduled_time=datetime.now(timezone.utc) - timedelta(minutes=1)
),
),
)
await session.commit()

await mark_flow_run_late(late_candidate.id, db=db)

await session.refresh(late_candidate)
# ENQUEUE strategy should mark Late, not cancel
assert late_candidate.state_name == "Late"
Loading