Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 61 additions & 2 deletions src/prefect/flow_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ class BaseFlowRunEngine(Generic[P, R]):
_is_started: bool = False
short_circuit: bool = False
_flow_run_name_set: bool = False
_flow_executed: bool = False
_telemetry: RunTelemetry = field(default_factory=RunTelemetry)

def __post_init__(self) -> None:
Expand Down Expand Up @@ -509,6 +510,15 @@ def set_state(self, state: State, force: bool = False) -> State:
if self.short_circuit:
return self.state

# Capture lease_id from CURRENT state before transition
# The server doesn't include deployment_concurrency_lease_id in the response state,
# so we must read it from the current state before propose_state_sync overwrites it
lease_id_to_release = None
if state.is_final() and self.flow_run.state:
lease_id_to_release = (
self.flow_run.state.state_details.deployment_concurrency_lease_id
)

state = propose_state_sync(
self.client, state, flow_run_id=self.flow_run.id, force=force
) # type: ignore
Expand All @@ -519,6 +529,20 @@ def set_state(self, state: State, force: bool = False) -> State:
self._telemetry.update_state(state)
self.call_hooks(state)

# Explicitly release concurrency lease after successful transition to terminal state
if state.is_final() and lease_id_to_release:
try:
self.client.release_concurrency_slots_with_lease(lease_id_to_release)
self.logger.debug(
f"Released concurrency lease {lease_id_to_release} after state transition to {state.type.name}"
)
except Exception as exc:
# Log but don't fail the flow run if lease release fails
self.logger.warning(
f"Failed to release concurrency lease {lease_id_to_release}: {exc}",
exc_info=True,
)

return state

def result(self, raise_on_failure: bool = True) -> "Union[R, State, None]":
Expand Down Expand Up @@ -548,6 +572,7 @@ def result(self, raise_on_failure: bool = True) -> "Union[R, State, None]":
return _result

def handle_success(self, result: R) -> R:
self._flow_executed = True
result_store = getattr(FlowRunContext.get(), "result_store", None)
if result_store is None:
raise ValueError("Result store is not set")
Expand Down Expand Up @@ -932,7 +957,11 @@ def initialize_run(self):
raise
except BaseException as exc:
# We don't want to crash a flow run if the user code finished executing
if self.flow_run.state and not self.flow_run.state.is_final():
if (
self.flow_run.state
and not self.flow_run.state.is_final()
and not self._flow_executed
):
# BaseExceptions are caught and handled as crashes
self.handle_crash(exc)
raise
Expand Down Expand Up @@ -1113,6 +1142,15 @@ async def set_state(self, state: State, force: bool = False) -> State:
if self.short_circuit:
return self.state

# Capture lease_id from CURRENT state before transition
# The server doesn't include deployment_concurrency_lease_id in the response state,
# so we must read it from the current state before propose_state overwrites it
lease_id_to_release = None
if state.is_final() and self.flow_run.state:
lease_id_to_release = (
self.flow_run.state.state_details.deployment_concurrency_lease_id
)

state = await propose_state(
self.client, state, flow_run_id=self.flow_run.id, force=force
) # type: ignore
Expand All @@ -1123,6 +1161,22 @@ async def set_state(self, state: State, force: bool = False) -> State:
self._telemetry.update_state(state)
await self.call_hooks(state)

# Explicitly release concurrency lease after successful transition to terminal state
if state.is_final() and lease_id_to_release:
try:
await self.client.release_concurrency_slots_with_lease(
lease_id_to_release
)
self.logger.debug(
f"Released concurrency lease {lease_id_to_release} after state transition to {state.type.name}"
)
except Exception as exc:
# Log but don't fail the flow run if lease release fails
self.logger.warning(
f"Failed to release concurrency lease {lease_id_to_release}: {exc}",
exc_info=True,
)

return state

async def result(self, raise_on_failure: bool = True) -> "Union[R, State, None]":
Expand Down Expand Up @@ -1151,6 +1205,7 @@ async def result(self, raise_on_failure: bool = True) -> "Union[R, State, None]"
return await self.state.aresult(raise_on_failure=raise_on_failure) # type: ignore

async def handle_success(self, result: R) -> R:
self._flow_executed = True
result_store = getattr(FlowRunContext.get(), "result_store", None)
if result_store is None:
raise ValueError("Result store is not set")
Expand Down Expand Up @@ -1537,7 +1592,11 @@ async def initialize_run(self):
raise
except BaseException as exc:
# We don't want to crash a flow run if the user code finished executing
if self.flow_run.state and not self.flow_run.state.is_final():
if (
self.flow_run.state
and not self.flow_run.state.is_final()
and not self._flow_executed
):
# BaseExceptions are caught and handled as crashes
await self.handle_crash(exc)
raise
Expand Down
34 changes: 28 additions & 6 deletions src/prefect/server/orchestration/core_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from prefect.server.exceptions import ObjectNotFoundError
from prefect.server.models import concurrency_limits, concurrency_limits_v2, deployments
from prefect.server.orchestration.dependencies import (
MIN_CLIENT_VERSION_FOR_CLIENT_SIDE_LEASE_RELEASE,
MIN_CLIENT_VERSION_FOR_CONCURRENCY_LIMIT_LEASING,
WORKER_VERSIONS_THAT_MANAGE_DEPLOYMENT_CONCURRENCY,
)
Expand Down Expand Up @@ -664,15 +665,17 @@ async def cleanup( # type: ignore
if not deployment or not deployment.concurrency_limit_id:
return

await concurrency_limits_v2.bulk_decrement_active_slots(
session=context.session,
concurrency_limit_ids=[deployment.concurrency_limit_id],
slots=1,
)
# Only decrement active slots if a lease was actually acquired
# (i.e., if deployment_concurrency_lease_id exists in validated_state)
if (
validated_state
and validated_state.state_details.deployment_concurrency_lease_id
):
await concurrency_limits_v2.bulk_decrement_active_slots(
session=context.session,
concurrency_limit_ids=[deployment.concurrency_limit_id],
slots=1,
)
lease_storage = get_concurrency_lease_storage()
await lease_storage.revoke_lease(
lease_id=validated_state.state_details.deployment_concurrency_lease_id,
Expand Down Expand Up @@ -842,14 +845,33 @@ class ReleaseFlowConcurrencySlots(FlowRunUniversalTransform):
"""
Releases deployment concurrency slots held by a flow run.

For backwards compatibility:
- Old clients (< 3.6.23): Server auto-releases leases during state transitions
- New clients (>= 3.6.23): Clients explicitly release leases after successful
state transitions, so server skips auto-release to avoid race conditions
with client-side renewal background task.

This rule releases a concurrency slot for a deployment when a flow run
transitions out of the Running or Cancelling state.
transitions out of the Running, Cancelling, or Pending state.
"""

async def after_transition(
self,
context: OrchestrationContext[orm_models.FlowRun, core.FlowRunPolicy],
) -> None:
# Skip auto-release for new clients (>= 3.6.23) that handle lease release themselves
# If client_version is None, assume old client (safe default, enables auto-release)
if context.client_version:
client_version = (
Version(context.client_version)
if isinstance(context.client_version, str)
else context.client_version
)
if client_version >= MIN_CLIENT_VERSION_FOR_CLIENT_SIDE_LEASE_RELEASE:
# New client will explicitly release the lease after successful state transition
return

# OLD BEHAVIOR: Server auto-releases leases for old clients
if self.nullified_transition():
return

Expand Down
3 changes: 3 additions & 0 deletions src/prefect/server/orchestration/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ class OrchestrationDependencies(TypedDict):
}

MIN_CLIENT_VERSION_FOR_CONCURRENCY_LIMIT_LEASING = Version("3.4.11")
# Clients >= this version explicitly release deployment concurrency leases themselves,
# so the server skips auto-release to avoid race conditions with client-side renewal
MIN_CLIENT_VERSION_FOR_CLIENT_SIDE_LEASE_RELEASE = Version("3.6.23")


async def provide_task_policy() -> type[TaskRunOrchestrationPolicy]:
Expand Down
Loading
Loading