From 2dd4274b0053ae684124a2462b0e527fd2150a7b Mon Sep 17 00:00:00 2001 From: nas Date: Sat, 7 Mar 2026 13:01:50 +0100 Subject: [PATCH 1/5] fix: resolve deployment concurrency slot lease release race condition Fixes critical race condition where deployment concurrency lease_id is read from the wrong state object after propose_state() overwrites the flow run's state, causing slots to remain occupied until TTL expires. Also prevents flows from being incorrectly marked as Crashed when BaseExceptions occur during post-execution state transitions. ## Root Cause ### Race Condition (Issue #17415) In FlowRunEngine.set_state() (both sync and async): 1. Flow in RUNNING state with lease_id in state_details 2. propose_state(Completed()) returns new COMPLETED state 3. propose_state() OVERWRITES self.flow_run.state with new state 4. Code reads lease_id from self.flow_run.state (now COMPLETED with lease_id=null) 5. release_concurrency_slots_with_lease() never called 6. Slot remains occupied until 300s TTL expiration 7. Other flows stuck in AwaitingConcurrencySlot state ### False Crash Detection (Issue #19068) When flows complete successfully but encounter infrastructure issues: 1. User code completes successfully 2. set_state(Completed()) makes API call 3. Lease renewal or API timeout raises BaseException 4. Existing code: "BaseException + not final state = crash!" 5. Flow incorrectly marked as CRASHED despite successful execution ## Changes ### Client-Side Fix (flow_engine.py) **Lease Release Timing Fix:** - Capture lease_id from current state BEFORE calling propose_state() - Use saved lease_id value for release API call - Applied to both sync (lines 507-545) and async (lines 1145-1180) **False Crash Prevention:** - Add `_flow_executed` flag to track when user code finishes - Set flag in handle_success() after user code completes - Update BaseException handler to check flag before marking as crashed - Prevents infrastructure issues from masking successful executions ### Server-Side Fixes (core_policy.py) - Fix SecureFlowConcurrencySlots.cleanup() to only decrement if lease exists - Restore ReleaseFlowConcurrencySlots.after_transition() with version checking ### Backwards Compatibility (dependencies.py + core_policy.py) - Added MIN_CLIENT_VERSION_FOR_CLIENT_SIDE_LEASE_RELEASE = Version("3.6.23") - Old clients (< 3.6.23 or None): Server auto-releases (prevents orphans) - New clients (>= 3.6.23): Client-side release only (avoids race) - Defaults to old behavior when version unknown (safe default) ## Testing - All 70 existing TestFlowConcurrencyLimits tests pass - Parameterized 4 key tests for old/new client behaviors - Added lease renewal failure resilience tests Closes #17415 Closes #19068 Related: #18942 --- src/prefect/flow_engine.py | 63 ++++- .../server/orchestration/core_policy.py | 34 ++- .../server/orchestration/dependencies.py | 3 + .../server/orchestration/test_core_policy.py | 217 ++++++++++++++---- tests/test_flow_engine.py | 106 +++++++++ 5 files changed, 376 insertions(+), 47 deletions(-) diff --git a/src/prefect/flow_engine.py b/src/prefect/flow_engine.py index 1eb95d2226bd..816839e0478f 100644 --- a/src/prefect/flow_engine.py +++ b/src/prefect/flow_engine.py @@ -303,6 +303,7 @@ class BaseFlowRunEngine(Generic[P, R]): _is_started: bool = False short_circuit: bool = False _flow_run_name_set: bool = False + _flow_executed: bool = False _telemetry: RunTelemetry = field(default_factory=RunTelemetry) def __post_init__(self) -> None: @@ -509,6 +510,15 @@ def set_state(self, state: State, force: bool = False) -> State: if self.short_circuit: return self.state + # Capture lease_id from CURRENT state before transition + # The server doesn't include deployment_concurrency_lease_id in the response state, + # so we must read it from the current state before propose_state_sync overwrites it + lease_id_to_release = None + if state.is_final() and self.flow_run.state: + lease_id_to_release = ( + self.flow_run.state.state_details.deployment_concurrency_lease_id + ) + state = propose_state_sync( self.client, state, flow_run_id=self.flow_run.id, force=force ) # type: ignore @@ -519,6 +529,20 @@ def set_state(self, state: State, force: bool = False) -> State: self._telemetry.update_state(state) self.call_hooks(state) + # Explicitly release concurrency lease after successful transition to terminal state + if state.is_final() and lease_id_to_release: + try: + self.client.release_concurrency_slots_with_lease(lease_id_to_release) + self.logger.debug( + f"Released concurrency lease {lease_id_to_release} after state transition to {state.type.name}" + ) + except Exception as exc: + # Log but don't fail the flow run if lease release fails + self.logger.warning( + f"Failed to release concurrency lease {lease_id_to_release}: {exc}", + exc_info=True, + ) + return state def result(self, raise_on_failure: bool = True) -> "Union[R, State, None]": @@ -548,6 +572,7 @@ def result(self, raise_on_failure: bool = True) -> "Union[R, State, None]": return _result def handle_success(self, result: R) -> R: + self._flow_executed = True result_store = getattr(FlowRunContext.get(), "result_store", None) if result_store is None: raise ValueError("Result store is not set") @@ -932,7 +957,11 @@ def initialize_run(self): raise except BaseException as exc: # We don't want to crash a flow run if the user code finished executing - if self.flow_run.state and not self.flow_run.state.is_final(): + if ( + self.flow_run.state + and not self.flow_run.state.is_final() + and not self._flow_executed + ): # BaseExceptions are caught and handled as crashes self.handle_crash(exc) raise @@ -1113,6 +1142,15 @@ async def set_state(self, state: State, force: bool = False) -> State: if self.short_circuit: return self.state + # Capture lease_id from CURRENT state before transition + # The server doesn't include deployment_concurrency_lease_id in the response state, + # so we must read it from the current state before propose_state overwrites it + lease_id_to_release = None + if state.is_final() and self.flow_run.state: + lease_id_to_release = ( + self.flow_run.state.state_details.deployment_concurrency_lease_id + ) + state = await propose_state( self.client, state, flow_run_id=self.flow_run.id, force=force ) # type: ignore @@ -1123,6 +1161,22 @@ async def set_state(self, state: State, force: bool = False) -> State: self._telemetry.update_state(state) await self.call_hooks(state) + # Explicitly release concurrency lease after successful transition to terminal state + if state.is_final() and lease_id_to_release: + try: + await self.client.release_concurrency_slots_with_lease( + lease_id_to_release + ) + self.logger.debug( + f"Released concurrency lease {lease_id_to_release} after state transition to {state.type.name}" + ) + except Exception as exc: + # Log but don't fail the flow run if lease release fails + self.logger.warning( + f"Failed to release concurrency lease {lease_id_to_release}: {exc}", + exc_info=True, + ) + return state async def result(self, raise_on_failure: bool = True) -> "Union[R, State, None]": @@ -1151,6 +1205,7 @@ async def result(self, raise_on_failure: bool = True) -> "Union[R, State, None]" return await self.state.aresult(raise_on_failure=raise_on_failure) # type: ignore async def handle_success(self, result: R) -> R: + self._flow_executed = True result_store = getattr(FlowRunContext.get(), "result_store", None) if result_store is None: raise ValueError("Result store is not set") @@ -1537,7 +1592,11 @@ async def initialize_run(self): raise except BaseException as exc: # We don't want to crash a flow run if the user code finished executing - if self.flow_run.state and not self.flow_run.state.is_final(): + if ( + self.flow_run.state + and not self.flow_run.state.is_final() + and not self._flow_executed + ): # BaseExceptions are caught and handled as crashes await self.handle_crash(exc) raise diff --git a/src/prefect/server/orchestration/core_policy.py b/src/prefect/server/orchestration/core_policy.py index 6679d28689be..3bc2ba8bcabc 100644 --- a/src/prefect/server/orchestration/core_policy.py +++ b/src/prefect/server/orchestration/core_policy.py @@ -29,6 +29,7 @@ from prefect.server.exceptions import ObjectNotFoundError from prefect.server.models import concurrency_limits, concurrency_limits_v2, deployments from prefect.server.orchestration.dependencies import ( + MIN_CLIENT_VERSION_FOR_CLIENT_SIDE_LEASE_RELEASE, MIN_CLIENT_VERSION_FOR_CONCURRENCY_LIMIT_LEASING, WORKER_VERSIONS_THAT_MANAGE_DEPLOYMENT_CONCURRENCY, ) @@ -664,15 +665,17 @@ async def cleanup( # type: ignore if not deployment or not deployment.concurrency_limit_id: return - await concurrency_limits_v2.bulk_decrement_active_slots( - session=context.session, - concurrency_limit_ids=[deployment.concurrency_limit_id], - slots=1, - ) + # Only decrement active slots if a lease was actually acquired + # (i.e., if deployment_concurrency_lease_id exists in validated_state) if ( validated_state and validated_state.state_details.deployment_concurrency_lease_id ): + await concurrency_limits_v2.bulk_decrement_active_slots( + session=context.session, + concurrency_limit_ids=[deployment.concurrency_limit_id], + slots=1, + ) lease_storage = get_concurrency_lease_storage() await lease_storage.revoke_lease( lease_id=validated_state.state_details.deployment_concurrency_lease_id, @@ -842,14 +845,33 @@ class ReleaseFlowConcurrencySlots(FlowRunUniversalTransform): """ Releases deployment concurrency slots held by a flow run. + For backwards compatibility: + - Old clients (< 3.6.23): Server auto-releases leases during state transitions + - New clients (>= 3.6.23): Clients explicitly release leases after successful + state transitions, so server skips auto-release to avoid race conditions + with client-side renewal background task. + This rule releases a concurrency slot for a deployment when a flow run - transitions out of the Running or Cancelling state. + transitions out of the Running, Cancelling, or Pending state. """ async def after_transition( self, context: OrchestrationContext[orm_models.FlowRun, core.FlowRunPolicy], ) -> None: + # Skip auto-release for new clients (>= 3.6.23) that handle lease release themselves + # If client_version is None, assume old client (safe default, enables auto-release) + if context.client_version: + client_version = ( + Version(context.client_version) + if isinstance(context.client_version, str) + else context.client_version + ) + if client_version >= MIN_CLIENT_VERSION_FOR_CLIENT_SIDE_LEASE_RELEASE: + # New client will explicitly release the lease after successful state transition + return + + # OLD BEHAVIOR: Server auto-releases leases for old clients if self.nullified_transition(): return diff --git a/src/prefect/server/orchestration/dependencies.py b/src/prefect/server/orchestration/dependencies.py index c39b1a531788..19429e8a3b0b 100644 --- a/src/prefect/server/orchestration/dependencies.py +++ b/src/prefect/server/orchestration/dependencies.py @@ -42,6 +42,9 @@ class OrchestrationDependencies(TypedDict): } MIN_CLIENT_VERSION_FOR_CONCURRENCY_LIMIT_LEASING = Version("3.4.11") +# Clients >= this version explicitly release deployment concurrency leases themselves, +# so the server skips auto-release to avoid race conditions with client-side renewal +MIN_CLIENT_VERSION_FOR_CLIENT_SIDE_LEASE_RELEASE = Version("3.6.23") async def provide_task_policy() -> type[TaskRunOrchestrationPolicy]: diff --git a/tests/server/orchestration/test_core_policy.py b/tests/server/orchestration/test_core_policy.py index 601e042145ce..748a346e9162 100644 --- a/tests/server/orchestration/test_core_policy.py +++ b/tests/server/orchestration/test_core_policy.py @@ -3711,12 +3711,28 @@ async def test_secure_concurrency_slots( ) assert ctx3.response_details.reason == "Deployment concurrency limit reached." + @pytest.mark.parametrize( + "client_version,expect_server_auto_release", + [ + ("3.6.22", True), # Old client: server auto-releases + ("3.6.23", False), # New client: server skips auto-release + (None, True), # No version: assume old client, server auto-releases + ], + ) async def test_release_concurrency_slots( self, session, initialize_orchestration, flow, + client_version, + expect_server_auto_release, ): + """ + Test deployment concurrency lease release behavior for both old and new clients. + + Old clients (< 3.6.23 or None): Server auto-releases leases on terminal state transitions + New clients (>= 3.6.23): Server skips auto-release, clients explicitly release leases + """ deployment = await self.create_deployment_with_concurrency_limit( session, 1, flow ) @@ -3730,7 +3746,11 @@ async def test_release_concurrency_slots( # First run should be accepted ctx1 = await initialize_orchestration( - session, "flow", *pending_transition, deployment_id=deployment.id + session, + "flow", + *pending_transition, + deployment_id=deployment.id, + client_version=client_version, ) async with contextlib.AsyncExitStack() as stack: @@ -3753,7 +3773,11 @@ async def test_release_concurrency_slots( # Second run should be delayed ctx2 = await initialize_orchestration( - session, "flow", *pending_transition, deployment_id=deployment.id + session, + "flow", + *pending_transition, + deployment_id=deployment.id, + client_version=client_version, ) with mock.patch("prefect.server.orchestration.core_policy.now") as mock_now: @@ -3787,6 +3811,7 @@ async def test_release_concurrency_slots( deployment_id=deployment.id, run_override=ctx1.run, initial_details=ctx1.validated_state.state_details, + client_version=client_version, ) async with contextlib.AsyncExitStack() as stack: @@ -3811,6 +3836,7 @@ async def test_release_concurrency_slots( deployment_id=deployment.id, run_override=ctx1.run, initial_details=ctx1_running.validated_state.state_details, + client_version=client_version, ) async with contextlib.AsyncExitStack() as stack: @@ -3819,29 +3845,63 @@ async def test_release_concurrency_slots( ) await ctx1_completed.validate_proposed_state() - assert ( - ctx1_completed.validated_state.state_details.deployment_concurrency_lease_id - is None - ) - lease_ids = await lease_storage.read_active_lease_ids() - assert len(lease_ids) == 0 + # Check behavior based on client version + if expect_server_auto_release: + # OLD CLIENT BEHAVIOR: Server auto-releases the lease + lease_ids = await lease_storage.read_active_lease_ids() + assert len(lease_ids) == 0, ( + f"Old client ({client_version}): Server should auto-release lease" + ) - # Now the second run should be accepted - ctx2_retry = await initialize_orchestration( - session, - "flow", - *pending_transition, - deployment_id=deployment.id, - run_override=ctx2.run, - ) + # Second run can now acquire the slot immediately + ctx2_retry = await initialize_orchestration( + session, + "flow", + *pending_transition, + deployment_id=deployment.id, + run_override=ctx2.run, + client_version=client_version, + ) - async with contextlib.AsyncExitStack() as stack: - ctx2_retry = await stack.enter_async_context( - SecureFlowConcurrencySlots(ctx2_retry, *pending_transition) + async with contextlib.AsyncExitStack() as stack: + ctx2_retry = await stack.enter_async_context( + SecureFlowConcurrencySlots(ctx2_retry, *pending_transition) + ) + await ctx2_retry.validate_proposed_state() + + # Second run is accepted because server released the lease + assert ctx2_retry.response_status == SetStateStatus.ACCEPT + else: + # NEW CLIENT BEHAVIOR: Server does NOT auto-release, client must do it + assert ( + ctx1_completed.validated_state.state_details.deployment_concurrency_lease_id + is None # Server clears this in the response ) - await ctx2_retry.validate_proposed_state() - assert ctx2_retry.response_status == SetStateStatus.ACCEPT + # However, the actual lease still exists because server doesn't release it + lease_ids = await lease_storage.read_active_lease_ids() + assert len(lease_ids) == 1, ( + f"New client ({client_version}): Server should NOT auto-release lease" + ) + + # The second run will remain blocked because the lease wasn't released + ctx2_retry = await initialize_orchestration( + session, + "flow", + *pending_transition, + deployment_id=deployment.id, + run_override=ctx2.run, + client_version=client_version, + ) + + async with contextlib.AsyncExitStack() as stack: + ctx2_retry = await stack.enter_async_context( + SecureFlowConcurrencySlots(ctx2_retry, *pending_transition) + ) + await ctx2_retry.validate_proposed_state() + + # Second run is still blocked because server didn't release the lease + assert ctx2_retry.response_status == SetStateStatus.REJECT async def test_cancel_new_collision_strategy( self, @@ -3886,11 +3946,20 @@ async def test_cancel_new_collision_strategy( ) assert ctx2.response_details.reason == "Deployment concurrency limit reached." + @pytest.mark.parametrize( + "client_version,expect_server_auto_release", + [ + ("3.6.22", True), # Old client: server auto-releases + ("3.6.23", False), # New client: server skips auto-release + ], + ) async def test_enqueue_collision_strategy( self, session, initialize_orchestration, flow, + client_version, + expect_server_auto_release, ): deployment = await self.create_deployment_with_concurrency_limit( session, 1, flow, schemas.core.ConcurrencyLimitStrategy.ENQUEUE @@ -3903,7 +3972,11 @@ async def test_enqueue_collision_strategy( # First run should be accepted ctx1 = await initialize_orchestration( - session, "flow", *pending_transition, deployment_id=deployment.id + session, + "flow", + *pending_transition, + deployment_id=deployment.id, + client_version=client_version, ) async with contextlib.AsyncExitStack() as stack: @@ -3916,7 +3989,11 @@ async def test_enqueue_collision_strategy( # Second run should be enqueued ctx2 = await initialize_orchestration( - session, "flow", *pending_transition, deployment_id=deployment.id + session, + "flow", + *pending_transition, + deployment_id=deployment.id, + client_version=client_version, ) with mock.patch("prefect.server.orchestration.core_policy.now") as mock_now: @@ -3947,6 +4024,7 @@ async def test_enqueue_collision_strategy( *completed_transition, deployment_id=deployment.id, run_override=ctx1.run, + client_version=client_version, ) async with contextlib.AsyncExitStack() as stack: @@ -3955,13 +4033,14 @@ async def test_enqueue_collision_strategy( ) await ctx1_completed.validate_proposed_state() - # Now the second run should be accepted + # Behavior depends on client version ctx2_retry = await initialize_orchestration( session, "flow", *pending_transition, deployment_id=deployment.id, run_override=ctx2.run, + client_version=client_version, ) async with contextlib.AsyncExitStack() as stack: @@ -3970,7 +4049,12 @@ async def test_enqueue_collision_strategy( ) await ctx2_retry.validate_proposed_state() - assert ctx2_retry.response_status == SetStateStatus.ACCEPT + if expect_server_auto_release: + # Old client: server released the lease, second run can proceed + assert ctx2_retry.response_status == SetStateStatus.ACCEPT + else: + # New client: server didn't release, second run still blocked + assert ctx2_retry.response_status == SetStateStatus.REJECT async def test_uses_enqueue_collision_strategy_by_default( self, @@ -4245,11 +4329,17 @@ async def test_multiple_deployments_with_different_concurrency_limits( await assert_deployment_concurrency_limit(session, deployment2, 2, 2) + @pytest.mark.parametrize( + "client_version,expect_server_auto_release", + [("3.6.22", True), ("3.6.23", False)], + ) async def test_flow_run_cancellation( self, session, initialize_orchestration, flow, + client_version, + expect_server_auto_release, ): deployment = await self.create_deployment_with_concurrency_limit( session, 1, flow @@ -4266,7 +4356,11 @@ async def test_flow_run_cancellation( # Secure a concurrency slot ctx1 = await initialize_orchestration( - session, "flow", *pending_transition, deployment_id=deployment.id + session, + "flow", + *pending_transition, + deployment_id=deployment.id, + client_version=client_version, ) async with contextlib.AsyncExitStack() as stack: ctx1 = await stack.enter_async_context( @@ -4277,7 +4371,11 @@ async def test_flow_run_cancellation( # Move to Cancelling state (should still hold the slot) ctx2 = await initialize_orchestration( - session, "flow", *cancelling_transition, deployment_id=deployment.id + session, + "flow", + *cancelling_transition, + deployment_id=deployment.id, + client_version=client_version, ) async with contextlib.AsyncExitStack() as stack: ctx2 = await stack.enter_async_context( @@ -4290,9 +4388,13 @@ async def test_flow_run_cancellation( session, deployment, 1, 1 ) # Concurrency slot still held - # Move to Cancelled state (should release the slot) + # Move to Cancelled state ctx3 = await initialize_orchestration( - session, "flow", *cancelled_transition, deployment_id=deployment.id + session, + "flow", + *cancelled_transition, + deployment_id=deployment.id, + client_version=client_version, ) async with contextlib.AsyncExitStack() as stack: ctx3 = await stack.enter_async_context( @@ -4300,26 +4402,46 @@ async def test_flow_run_cancellation( ) await ctx3.validate_proposed_state() - await assert_deployment_concurrency_limit(session, deployment, 1, 0) + # Check if server auto-released based on client version + if expect_server_auto_release: + # Old client: Server auto-releases the slot + await assert_deployment_concurrency_limit(session, deployment, 1, 0) + else: + # New client: Server does NOT auto-release (client responsibility) + await assert_deployment_concurrency_limit(session, deployment, 1, 1) - # Verify that the concurrency slot can be secured again + # Second flow run behavior depends on whether slot was released ctx4 = await initialize_orchestration( - session, "flow", *pending_transition, deployment_id=deployment.id + session, + "flow", + *pending_transition, + deployment_id=deployment.id, + client_version=client_version, ) async with contextlib.AsyncExitStack() as stack: ctx4 = await stack.enter_async_context( SecureFlowConcurrencySlots(ctx4, *pending_transition) ) await ctx4.validate_proposed_state() - assert ctx4.response_status == SetStateStatus.ACCEPT - await assert_deployment_concurrency_limit(session, deployment, 1, 1) + if expect_server_auto_release: + # Old client: Slot was released, so new run can proceed + assert ctx4.response_status == SetStateStatus.ACCEPT + else: + # New client: Slot wasn't released by server, so new run is blocked + assert ctx4.response_status == SetStateStatus.REJECT + @pytest.mark.parametrize( + "client_version,expect_server_auto_release", + [("3.6.22", True), ("3.6.23", False)], + ) async def test_pending_running_completed_releases_concurrency_slot( self, session, initialize_orchestration, flow, + client_version, + expect_server_auto_release, ): deployment = await self.create_deployment_with_concurrency_limit( session, 1, flow @@ -4333,7 +4455,11 @@ async def test_pending_running_completed_releases_concurrency_slot( # Secure a concurrency slot ctx1 = await initialize_orchestration( - session, "flow", *pending_transition, deployment_id=deployment.id + session, + "flow", + *pending_transition, + deployment_id=deployment.id, + client_version=client_version, ) async with contextlib.AsyncExitStack() as stack: ctx1 = await stack.enter_async_context( @@ -4346,7 +4472,11 @@ async def test_pending_running_completed_releases_concurrency_slot( # Move to running state ctx2 = await initialize_orchestration( - session, "flow", *running_transition, deployment_id=deployment.id + session, + "flow", + *running_transition, + deployment_id=deployment.id, + client_version=client_version, ) async with contextlib.AsyncExitStack() as stack: ctx2 = await stack.enter_async_context( @@ -4360,7 +4490,11 @@ async def test_pending_running_completed_releases_concurrency_slot( # Now move to completed ctx2 = await initialize_orchestration( - session, "flow", *completed_transition, deployment_id=deployment.id + session, + "flow", + *completed_transition, + deployment_id=deployment.id, + client_version=client_version, ) async with contextlib.AsyncExitStack() as stack: ctx2 = await stack.enter_async_context( @@ -4368,8 +4502,13 @@ async def test_pending_running_completed_releases_concurrency_slot( ) await ctx2.validate_proposed_state() - # Slot is released - await assert_deployment_concurrency_limit(session, deployment, 1, 0) + # Check if server auto-released based on client version + if expect_server_auto_release: + # Old client: Server auto-releases the slot + await assert_deployment_concurrency_limit(session, deployment, 1, 0) + else: + # New client: Server does NOT auto-release (client responsibility) + await assert_deployment_concurrency_limit(session, deployment, 1, 1) async def test_error_handling( self, diff --git a/tests/test_flow_engine.py b/tests/test_flow_engine.py index a4811382d96f..ff0b90383928 100644 --- a/tests/test_flow_engine.py +++ b/tests/test_flow_engine.py @@ -1192,6 +1192,112 @@ async def begin_run_with_exception(self): # The flow run should be crashed assert flow_run.state.is_crashed() + async def test_lease_renewal_failure_during_state_transition_does_not_crash_sync( + self, prefect_client, monkeypatch, caplog + ): + """ + Test that a flow run that completes successfully but has a lease renewal + failure during the state transition API call does not get marked as crashed. + This simulates the exact scenario from issue #19068. + """ + from prefect._internal.concurrency.cancellation import CancelledError + from prefect.exceptions import UnfinishedRun + + flow_name = f"my-flow-{uuid.uuid4()}" + + @flow(name=flow_name) + def my_flow(): + return 42 + + # Mock set_state to raise CancelledError (simulating lease cancellation) + original_set_state = FlowRunEngine.set_state + call_count = {"count": 0} + + def set_state_with_cancellation(self, state, force=False): + call_count["count"] += 1 + # First call is to set Running state - let it succeed + if call_count["count"] == 1: + return original_set_state(self, state, force) + # Second call is to set Completed state - simulate cancellation + # But first mark as executed to match real behavior + self._flow_executed = True + raise CancelledError() + + monkeypatch.setattr(FlowRunEngine, "set_state", set_state_with_cancellation) + + # Run the flow, expecting it to finish without crashing + # The state transition will fail but the flow itself executes successfully + # Since the state never transitions to Completed, calling my_flow() will + # raise UnfinishedRun when trying to get the result + with pytest.raises(UnfinishedRun): + my_flow() + + flow_runs = await prefect_client.read_flow_runs( + flow_filter=FlowFilter(name=FlowFilterName(any_=[flow_name])) + ) + assert len(flow_runs) == 1 + flow_run = flow_runs[0] + # The flow run should NOT be crashed - it should stay in Running + # because the state transition to Completed failed + assert not flow_run.state.is_crashed() + # Verify the debug log message was recorded + assert ( + "BaseException was raised after user code finished executing" in caplog.text + ) + + async def test_lease_renewal_failure_during_state_transition_does_not_crash_async( + self, prefect_client, monkeypatch, caplog + ): + """ + Test that an async flow run that completes successfully but has a lease + renewal failure during the state transition API call does not get marked as crashed. + """ + from prefect._internal.concurrency.cancellation import CancelledError + from prefect.exceptions import UnfinishedRun + + flow_name = f"my-flow-{uuid.uuid4()}" + + @flow(name=flow_name) + async def my_flow(): + return 42 + + # Mock set_state to raise CancelledError (simulating lease cancellation) + original_set_state = AsyncFlowRunEngine.set_state + call_count = {"count": 0} + + async def set_state_with_cancellation(self, state, force=False): + call_count["count"] += 1 + # First call is to set Running state - let it succeed + if call_count["count"] == 1: + return await original_set_state(self, state, force) + # Second call is to set Completed state - simulate cancellation + # But first mark as executed to match real behavior + self._flow_executed = True + raise CancelledError() + + monkeypatch.setattr( + AsyncFlowRunEngine, "set_state", set_state_with_cancellation + ) + + # Run the flow, expecting it to finish without crashing + # The state transition will fail but the flow itself executes successfully + # Since the state never transitions to Completed, calling my_flow() will + # raise UnfinishedRun when trying to get the result + with pytest.raises(UnfinishedRun): + await my_flow() + + flow_runs = await prefect_client.read_flow_runs( + flow_filter=FlowFilter(name=FlowFilterName(any_=[flow_name])) + ) + assert len(flow_runs) == 1 + flow_run = flow_runs[0] + # The flow run should NOT be crashed + assert not flow_run.state.is_crashed() + # Verify the debug log message was recorded + assert ( + "BaseException was raised after user code finished executing" in caplog.text + ) + class TestPauseFlowRun: async def test_pause_flow_run_from_task_pauses_parent_flow( From 4414af914939f7d68600a409691bca62e4ee1218 Mon Sep 17 00:00:00 2001 From: nas Date: Fri, 13 Mar 2026 11:27:15 +0100 Subject: [PATCH 2/5] handle graceful lease renewals on state transitions solely on the client --- src/prefect/concurrency/_leases.py | 78 ++++++++- src/prefect/flow_engine.py | 66 ++------ .../server/orchestration/core_policy.py | 20 --- .../server/orchestration/dependencies.py | 3 - tests/concurrency/test_leases.py | 60 ++++++- .../server/orchestration/test_core_policy.py | 159 +++--------------- tests/test_flow_engine.py | 4 +- 7 files changed, 177 insertions(+), 213 deletions(-) diff --git a/src/prefect/concurrency/_leases.py b/src/prefect/concurrency/_leases.py index c46bd650b974..b408ddbcec91 100644 --- a/src/prefect/concurrency/_leases.py +++ b/src/prefect/concurrency/_leases.py @@ -1,9 +1,11 @@ import asyncio import concurrent.futures from contextlib import asynccontextmanager, contextmanager -from typing import AsyncGenerator, Generator +from typing import AsyncGenerator, Callable, Generator from uuid import UUID +import httpx + from prefect._internal.concurrency.api import create_call from prefect._internal.concurrency.cancellation import ( AsyncCancelScope, @@ -15,9 +17,20 @@ from prefect.logging.loggers import get_logger, get_run_logger +class _LeaseGoneError(BaseException): + """Raised when the server returns 410 Gone during lease renewal. + + Intentionally inherits from BaseException (not Exception) so that + @retry_async_fn(retry_on_exceptions=(Exception,)) never retries it. + A 410 means the lease was revoked or expired server-side and is not + a transient condition — retrying will never succeed. + """ + + async def _lease_renewal_loop( lease_id: UUID, lease_duration: float, + should_stop: Callable[[], bool] = lambda: False, ) -> None: """ Maintain a concurrency lease by renewing it after the given interval. @@ -25,16 +38,30 @@ async def _lease_renewal_loop( Args: lease_id: The ID of the lease to maintain. lease_duration: The duration of the lease in seconds. + should_stop: An optional callable that returns True when the renewal loop + should exit cleanly. Checked before each renewal attempt so that the + loop can stop without raising when the flow has already reached a + terminal state (e.g. the server will release the lease itself). """ async with get_client() as client: @retry_async_fn(max_attempts=3, operation_name="concurrency lease renewal") async def renew() -> None: - await client.renew_concurrency_lease( - lease_id=lease_id, lease_duration=lease_duration - ) + try: + await client.renew_concurrency_lease( + lease_id=lease_id, lease_duration=lease_duration + ) + except httpx.HTTPStatusError as e: + if e.response.status_code == 410: + raise _LeaseGoneError( + f"Concurrency lease {lease_id} has expired or been revoked by the server." + ) from e + raise while True: + # Exit cleanly if the caller signals that the flow is done. + if should_stop(): + return await renew() await asyncio.sleep( # Renew the lease 3/4 of the way through the lease duration lease_duration * 0.75 @@ -47,6 +74,7 @@ def maintain_concurrency_lease( lease_duration: float, raise_on_lease_renewal_failure: bool = False, suppress_warnings: bool = False, + should_stop: Callable[[], bool] = lambda: False, ) -> Generator[None, None, None]: """ Maintain a concurrency lease for the given lease ID. @@ -55,6 +83,11 @@ def maintain_concurrency_lease( lease_id: The ID of the lease to maintain. lease_duration: The duration of the lease in seconds. raise_on_lease_renewal_failure: A boolean specifying whether to raise an error if the lease renewal fails. + should_stop: An optional callable that returns True when the renewal loop + should exit cleanly without treating a failure as a crash. Typically + set to a check on the engine's current flow run state so that renewal + failures that occur after a successful terminal state transition are + silently ignored instead of propagated as crashes. """ # Start a loop to renew the lease on the global event loop to avoid blocking the main thread global_loop = get_global_loop() @@ -62,6 +95,7 @@ def maintain_concurrency_lease( _lease_renewal_loop, lease_id, lease_duration, + should_stop, ) global_loop.submit(lease_renewal_call) @@ -72,6 +106,19 @@ def handle_lease_renewal_failure(future: concurrent.futures.Future[None]): return exc = future.exception() if exc: + # If the caller signals that the flow is already done, a renewal + # failure is expected (the server released the lease during the + # terminal state transition). Suppress it rather than crashing. + if should_stop(): + try: + logger = get_run_logger() + except Exception: + logger = get_logger("concurrency") + logger.debug( + "Concurrency lease renewal failed after flow reached terminal state - this is expected.", + exc_info=(type(exc), exc, exc.__traceback__), + ) + return try: # Use a run logger if available logger = get_run_logger() @@ -110,6 +157,7 @@ async def amaintain_concurrency_lease( lease_duration: float, raise_on_lease_renewal_failure: bool = False, suppress_warnings: bool = False, + should_stop: Callable[[], bool] = lambda: False, ) -> AsyncGenerator[None, None]: """ Maintain a concurrency lease for the given lease ID. @@ -118,9 +166,14 @@ async def amaintain_concurrency_lease( lease_id: The ID of the lease to maintain. lease_duration: The duration of the lease in seconds. raise_on_lease_renewal_failure: A boolean specifying whether to raise an error if the lease renewal fails. + should_stop: An optional callable that returns True when the renewal loop + should exit cleanly without treating a failure as a crash. Typically + set to a check on the engine's current flow run state so that renewal + failures that occur after a successful terminal state transition are + silently ignored instead of propagated as crashes. """ lease_renewal_task = asyncio.create_task( - _lease_renewal_loop(lease_id, lease_duration) + _lease_renewal_loop(lease_id, lease_duration, should_stop) ) with AsyncCancelScope() as cancel_scope: @@ -130,6 +183,19 @@ def handle_lease_renewal_failure(task: asyncio.Task[None]): return exc = task.exception() if exc: + # If the caller signals that the flow is already done, a renewal + # failure is expected (the server released the lease during the + # terminal state transition). Suppress it rather than crashing. + if should_stop(): + try: + logger = get_run_logger() + except Exception: + logger = get_logger("concurrency") + logger.debug( + "Concurrency lease renewal failed after flow reached terminal state - this is expected.", + exc_info=(type(exc), exc, exc.__traceback__), + ) + return try: # Use a run logger if available logger = get_run_logger() @@ -161,6 +227,6 @@ def handle_lease_renewal_failure(task: asyncio.Task[None]): lease_renewal_task.cancel() try: await lease_renewal_task - except (asyncio.CancelledError, Exception): + except (asyncio.CancelledError, Exception, BaseException): # Handling for errors will be done in the callback pass diff --git a/src/prefect/flow_engine.py b/src/prefect/flow_engine.py index 816839e0478f..dc5dc148c959 100644 --- a/src/prefect/flow_engine.py +++ b/src/prefect/flow_engine.py @@ -510,15 +510,6 @@ def set_state(self, state: State, force: bool = False) -> State: if self.short_circuit: return self.state - # Capture lease_id from CURRENT state before transition - # The server doesn't include deployment_concurrency_lease_id in the response state, - # so we must read it from the current state before propose_state_sync overwrites it - lease_id_to_release = None - if state.is_final() and self.flow_run.state: - lease_id_to_release = ( - self.flow_run.state.state_details.deployment_concurrency_lease_id - ) - state = propose_state_sync( self.client, state, flow_run_id=self.flow_run.id, force=force ) # type: ignore @@ -529,20 +520,6 @@ def set_state(self, state: State, force: bool = False) -> State: self._telemetry.update_state(state) self.call_hooks(state) - # Explicitly release concurrency lease after successful transition to terminal state - if state.is_final() and lease_id_to_release: - try: - self.client.release_concurrency_slots_with_lease(lease_id_to_release) - self.logger.debug( - f"Released concurrency lease {lease_id_to_release} after state transition to {state.type.name}" - ) - except Exception as exc: - # Log but don't fail the flow run if lease release fails - self.logger.warning( - f"Failed to release concurrency lease {lease_id_to_release}: {exc}", - exc_info=True, - ) - return state def result(self, raise_on_failure: bool = True) -> "Union[R, State, None]": @@ -856,7 +833,14 @@ def setup_run_context(self, client: Optional[SyncPrefectClient] = None): if lease_id := self.state.state_details.deployment_concurrency_lease_id: stack.enter_context( maintain_concurrency_lease( - lease_id, 300, raise_on_lease_renewal_failure=True + lease_id, + 300, + raise_on_lease_renewal_failure=True, + should_stop=lambda: bool( + self.flow_run + and self.flow_run.state + and self.flow_run.state.is_final() + ), ) ) @@ -1142,15 +1126,6 @@ async def set_state(self, state: State, force: bool = False) -> State: if self.short_circuit: return self.state - # Capture lease_id from CURRENT state before transition - # The server doesn't include deployment_concurrency_lease_id in the response state, - # so we must read it from the current state before propose_state overwrites it - lease_id_to_release = None - if state.is_final() and self.flow_run.state: - lease_id_to_release = ( - self.flow_run.state.state_details.deployment_concurrency_lease_id - ) - state = await propose_state( self.client, state, flow_run_id=self.flow_run.id, force=force ) # type: ignore @@ -1161,22 +1136,6 @@ async def set_state(self, state: State, force: bool = False) -> State: self._telemetry.update_state(state) await self.call_hooks(state) - # Explicitly release concurrency lease after successful transition to terminal state - if state.is_final() and lease_id_to_release: - try: - await self.client.release_concurrency_slots_with_lease( - lease_id_to_release - ) - self.logger.debug( - f"Released concurrency lease {lease_id_to_release} after state transition to {state.type.name}" - ) - except Exception as exc: - # Log but don't fail the flow run if lease release fails - self.logger.warning( - f"Failed to release concurrency lease {lease_id_to_release}: {exc}", - exc_info=True, - ) - return state async def result(self, raise_on_failure: bool = True) -> "Union[R, State, None]": @@ -1487,7 +1446,14 @@ async def setup_run_context(self, client: Optional[PrefectClient] = None): if lease_id := self.state.state_details.deployment_concurrency_lease_id: await stack.enter_async_context( amaintain_concurrency_lease( - lease_id, 300, raise_on_lease_renewal_failure=True + lease_id, + 300, + raise_on_lease_renewal_failure=True, + should_stop=lambda: bool( + self.flow_run + and self.flow_run.state + and self.flow_run.state.is_final() + ), ) ) diff --git a/src/prefect/server/orchestration/core_policy.py b/src/prefect/server/orchestration/core_policy.py index 599e40a5cd1e..03be3715f2d9 100644 --- a/src/prefect/server/orchestration/core_policy.py +++ b/src/prefect/server/orchestration/core_policy.py @@ -29,7 +29,6 @@ from prefect.server.exceptions import ObjectNotFoundError from prefect.server.models import concurrency_limits, concurrency_limits_v2, deployments from prefect.server.orchestration.dependencies import ( - MIN_CLIENT_VERSION_FOR_CLIENT_SIDE_LEASE_RELEASE, MIN_CLIENT_VERSION_FOR_CONCURRENCY_LIMIT_LEASING, WORKER_VERSIONS_THAT_MANAGE_DEPLOYMENT_CONCURRENCY, ) @@ -846,12 +845,6 @@ class ReleaseFlowConcurrencySlots(FlowRunUniversalTransform): """ Releases deployment concurrency slots held by a flow run. - For backwards compatibility: - - Old clients (< 3.6.23): Server auto-releases leases during state transitions - - New clients (>= 3.6.23): Clients explicitly release leases after successful - state transitions, so server skips auto-release to avoid race conditions - with client-side renewal background task. - This rule releases a concurrency slot for a deployment when a flow run transitions out of the Running, Cancelling, or Pending state. """ @@ -860,19 +853,6 @@ async def after_transition( self, context: OrchestrationContext[orm_models.FlowRun, core.FlowRunPolicy], ) -> None: - # Skip auto-release for new clients (>= 3.6.23) that handle lease release themselves - # If client_version is None, assume old client (safe default, enables auto-release) - if context.client_version: - client_version = ( - Version(context.client_version) - if isinstance(context.client_version, str) - else context.client_version - ) - if client_version >= MIN_CLIENT_VERSION_FOR_CLIENT_SIDE_LEASE_RELEASE: - # New client will explicitly release the lease after successful state transition - return - - # OLD BEHAVIOR: Server auto-releases leases for old clients if self.nullified_transition(): return diff --git a/src/prefect/server/orchestration/dependencies.py b/src/prefect/server/orchestration/dependencies.py index 19429e8a3b0b..c39b1a531788 100644 --- a/src/prefect/server/orchestration/dependencies.py +++ b/src/prefect/server/orchestration/dependencies.py @@ -42,9 +42,6 @@ class OrchestrationDependencies(TypedDict): } MIN_CLIENT_VERSION_FOR_CONCURRENCY_LIMIT_LEASING = Version("3.4.11") -# Clients >= this version explicitly release deployment concurrency leases themselves, -# so the server skips auto-release to avoid race conditions with client-side renewal -MIN_CLIENT_VERSION_FOR_CLIENT_SIDE_LEASE_RELEASE = Version("3.6.23") async def provide_task_policy() -> type[TaskRunOrchestrationPolicy]: diff --git a/tests/concurrency/test_leases.py b/tests/concurrency/test_leases.py index 6986b1919cde..2ffad8c54800 100644 --- a/tests/concurrency/test_leases.py +++ b/tests/concurrency/test_leases.py @@ -2,9 +2,16 @@ from unittest import mock from uuid import uuid4 +import httpx import pytest -from prefect.concurrency._leases import _lease_renewal_loop +from prefect.concurrency._leases import _lease_renewal_loop, _LeaseGoneError + + +def _make_http_status_error(status_code: int) -> httpx.HTTPStatusError: + request = httpx.Request("POST", "http://test/leases/renew") + response = httpx.Response(status_code, request=request) + return httpx.HTTPStatusError(f"{status_code}", request=request, response=response) async def test_lease_renewal_loop_renews_lease(): @@ -59,3 +66,54 @@ async def test_lease_renewal_loop_raises_after_max_retry_attempts(): # retry_async_fn with max_attempts=3 tries exactly 3 times assert mock_client.renew_concurrency_lease.call_count == 3 + + +async def test_lease_renewal_loop_raises_lease_gone_on_410(): + """A 410 response raises _LeaseGoneError immediately without any retries.""" + mock_client = mock.AsyncMock() + mock_client.renew_concurrency_lease.side_effect = _make_http_status_error(410) + + with ( + mock.patch("prefect.concurrency._leases.get_client") as mock_get_client, + mock.patch("asyncio.sleep", new_callable=mock.AsyncMock), + ): + mock_get_client.return_value.__aenter__.return_value = mock_client + with pytest.raises(_LeaseGoneError): + await _lease_renewal_loop(lease_id=uuid4(), lease_duration=10.0) + + # No retries — called exactly once + assert mock_client.renew_concurrency_lease.call_count == 1 + + +async def test_lease_renewal_loop_retries_on_non_410_http_error(): + """Non-410 HTTP errors (e.g. 500) are retried up to 3 times like any other Exception.""" + mock_client = mock.AsyncMock() + mock_client.renew_concurrency_lease.side_effect = _make_http_status_error(500) + + with ( + mock.patch("prefect.concurrency._leases.get_client") as mock_get_client, + mock.patch("asyncio.sleep", new_callable=mock.AsyncMock), + ): + mock_get_client.return_value.__aenter__.return_value = mock_client + with pytest.raises(httpx.HTTPStatusError): + await _lease_renewal_loop(lease_id=uuid4(), lease_duration=10.0) + + # Retried up to max_attempts=3 + assert mock_client.renew_concurrency_lease.call_count == 3 + + +async def test_lease_renewal_loop_exits_cleanly_when_should_stop(): + """If should_stop() is True from the start, the loop exits without making any requests.""" + mock_client = mock.AsyncMock() + mock_client.renew_concurrency_lease.side_effect = _make_http_status_error(410) + + with ( + mock.patch("prefect.concurrency._leases.get_client") as mock_get_client, + mock.patch("asyncio.sleep", new_callable=mock.AsyncMock), + ): + mock_get_client.return_value.__aenter__.return_value = mock_client + await _lease_renewal_loop( + lease_id=uuid4(), lease_duration=10.0, should_stop=lambda: True + ) + + assert mock_client.renew_concurrency_lease.call_count == 0 diff --git a/tests/server/orchestration/test_core_policy.py b/tests/server/orchestration/test_core_policy.py index 3bea42602a9a..0f98942a5762 100644 --- a/tests/server/orchestration/test_core_policy.py +++ b/tests/server/orchestration/test_core_policy.py @@ -3827,27 +3827,15 @@ async def test_secure_concurrency_slots( ) assert ctx3.response_details.reason == "Deployment concurrency limit reached." - @pytest.mark.parametrize( - "client_version,expect_server_auto_release", - [ - ("3.6.22", True), # Old client: server auto-releases - ("3.6.23", False), # New client: server skips auto-release - (None, True), # No version: assume old client, server auto-releases - ], - ) async def test_release_concurrency_slots( self, session, initialize_orchestration, flow, - client_version, - expect_server_auto_release, ): """ - Test deployment concurrency lease release behavior for both old and new clients. - - Old clients (< 3.6.23 or None): Server auto-releases leases on terminal state transitions - New clients (>= 3.6.23): Server skips auto-release, clients explicitly release leases + Test that the server auto-releases deployment concurrency leases on terminal + state transitions. """ deployment = await self.create_deployment_with_concurrency_limit( session, 1, flow @@ -3866,7 +3854,6 @@ async def test_release_concurrency_slots( "flow", *pending_transition, deployment_id=deployment.id, - client_version=client_version, ) async with contextlib.AsyncExitStack() as stack: @@ -3893,7 +3880,6 @@ async def test_release_concurrency_slots( "flow", *pending_transition, deployment_id=deployment.id, - client_version=client_version, ) with mock.patch("prefect.server.orchestration.core_policy.now") as mock_now: @@ -3927,7 +3913,6 @@ async def test_release_concurrency_slots( deployment_id=deployment.id, run_override=ctx1.run, initial_details=ctx1.validated_state.state_details, - client_version=client_version, ) async with contextlib.AsyncExitStack() as stack: @@ -3952,7 +3937,6 @@ async def test_release_concurrency_slots( deployment_id=deployment.id, run_override=ctx1.run, initial_details=ctx1_running.validated_state.state_details, - client_version=client_version, ) async with contextlib.AsyncExitStack() as stack: @@ -3961,63 +3945,26 @@ async def test_release_concurrency_slots( ) await ctx1_completed.validate_proposed_state() - # Check behavior based on client version - if expect_server_auto_release: - # OLD CLIENT BEHAVIOR: Server auto-releases the lease - lease_ids = await lease_storage.read_active_lease_ids() - assert len(lease_ids) == 0, ( - f"Old client ({client_version}): Server should auto-release lease" - ) + # Server always auto-releases the lease on terminal state transition + lease_ids = await lease_storage.read_active_lease_ids() + assert len(lease_ids) == 0 - # Second run can now acquire the slot immediately - ctx2_retry = await initialize_orchestration( - session, - "flow", - *pending_transition, - deployment_id=deployment.id, - run_override=ctx2.run, - client_version=client_version, - ) - - async with contextlib.AsyncExitStack() as stack: - ctx2_retry = await stack.enter_async_context( - SecureFlowConcurrencySlots(ctx2_retry, *pending_transition) - ) - await ctx2_retry.validate_proposed_state() - - # Second run is accepted because server released the lease - assert ctx2_retry.response_status == SetStateStatus.ACCEPT - else: - # NEW CLIENT BEHAVIOR: Server does NOT auto-release, client must do it - assert ( - ctx1_completed.validated_state.state_details.deployment_concurrency_lease_id - is None # Server clears this in the response - ) - - # However, the actual lease still exists because server doesn't release it - lease_ids = await lease_storage.read_active_lease_ids() - assert len(lease_ids) == 1, ( - f"New client ({client_version}): Server should NOT auto-release lease" - ) + # Second run can now acquire the slot immediately + ctx2_retry = await initialize_orchestration( + session, + "flow", + *pending_transition, + deployment_id=deployment.id, + run_override=ctx2.run, + ) - # The second run will remain blocked because the lease wasn't released - ctx2_retry = await initialize_orchestration( - session, - "flow", - *pending_transition, - deployment_id=deployment.id, - run_override=ctx2.run, - client_version=client_version, + async with contextlib.AsyncExitStack() as stack: + ctx2_retry = await stack.enter_async_context( + SecureFlowConcurrencySlots(ctx2_retry, *pending_transition) ) + await ctx2_retry.validate_proposed_state() - async with contextlib.AsyncExitStack() as stack: - ctx2_retry = await stack.enter_async_context( - SecureFlowConcurrencySlots(ctx2_retry, *pending_transition) - ) - await ctx2_retry.validate_proposed_state() - - # Second run is still blocked because server didn't release the lease - assert ctx2_retry.response_status == SetStateStatus.REJECT + assert ctx2_retry.response_status == SetStateStatus.ACCEPT async def test_cancel_new_collision_strategy( self, @@ -4062,20 +4009,11 @@ async def test_cancel_new_collision_strategy( ) assert ctx2.response_details.reason == "Deployment concurrency limit reached." - @pytest.mark.parametrize( - "client_version,expect_server_auto_release", - [ - ("3.6.22", True), # Old client: server auto-releases - ("3.6.23", False), # New client: server skips auto-release - ], - ) async def test_enqueue_collision_strategy( self, session, initialize_orchestration, flow, - client_version, - expect_server_auto_release, ): deployment = await self.create_deployment_with_concurrency_limit( session, 1, flow, schemas.core.ConcurrencyLimitStrategy.ENQUEUE @@ -4092,7 +4030,6 @@ async def test_enqueue_collision_strategy( "flow", *pending_transition, deployment_id=deployment.id, - client_version=client_version, ) async with contextlib.AsyncExitStack() as stack: @@ -4109,7 +4046,6 @@ async def test_enqueue_collision_strategy( "flow", *pending_transition, deployment_id=deployment.id, - client_version=client_version, ) with mock.patch("prefect.server.orchestration.core_policy.now") as mock_now: @@ -4140,7 +4076,6 @@ async def test_enqueue_collision_strategy( *completed_transition, deployment_id=deployment.id, run_override=ctx1.run, - client_version=client_version, ) async with contextlib.AsyncExitStack() as stack: @@ -4149,14 +4084,13 @@ async def test_enqueue_collision_strategy( ) await ctx1_completed.validate_proposed_state() - # Behavior depends on client version + # Server always auto-releases; second run can proceed ctx2_retry = await initialize_orchestration( session, "flow", *pending_transition, deployment_id=deployment.id, run_override=ctx2.run, - client_version=client_version, ) async with contextlib.AsyncExitStack() as stack: @@ -4165,12 +4099,8 @@ async def test_enqueue_collision_strategy( ) await ctx2_retry.validate_proposed_state() - if expect_server_auto_release: - # Old client: server released the lease, second run can proceed - assert ctx2_retry.response_status == SetStateStatus.ACCEPT - else: - # New client: server didn't release, second run still blocked - assert ctx2_retry.response_status == SetStateStatus.REJECT + # Server released the lease, second run can proceed + assert ctx2_retry.response_status == SetStateStatus.ACCEPT async def test_uses_enqueue_collision_strategy_by_default( self, @@ -4445,17 +4375,11 @@ async def test_multiple_deployments_with_different_concurrency_limits( await assert_deployment_concurrency_limit(session, deployment2, 2, 2) - @pytest.mark.parametrize( - "client_version,expect_server_auto_release", - [("3.6.22", True), ("3.6.23", False)], - ) async def test_flow_run_cancellation( self, session, initialize_orchestration, flow, - client_version, - expect_server_auto_release, ): deployment = await self.create_deployment_with_concurrency_limit( session, 1, flow @@ -4476,7 +4400,6 @@ async def test_flow_run_cancellation( "flow", *pending_transition, deployment_id=deployment.id, - client_version=client_version, ) async with contextlib.AsyncExitStack() as stack: ctx1 = await stack.enter_async_context( @@ -4491,7 +4414,6 @@ async def test_flow_run_cancellation( "flow", *cancelling_transition, deployment_id=deployment.id, - client_version=client_version, ) async with contextlib.AsyncExitStack() as stack: ctx2 = await stack.enter_async_context( @@ -4510,7 +4432,6 @@ async def test_flow_run_cancellation( "flow", *cancelled_transition, deployment_id=deployment.id, - client_version=client_version, ) async with contextlib.AsyncExitStack() as stack: ctx3 = await stack.enter_async_context( @@ -4518,21 +4439,15 @@ async def test_flow_run_cancellation( ) await ctx3.validate_proposed_state() - # Check if server auto-released based on client version - if expect_server_auto_release: - # Old client: Server auto-releases the slot - await assert_deployment_concurrency_limit(session, deployment, 1, 0) - else: - # New client: Server does NOT auto-release (client responsibility) - await assert_deployment_concurrency_limit(session, deployment, 1, 1) + # Server always auto-releases the slot + await assert_deployment_concurrency_limit(session, deployment, 1, 0) - # Second flow run behavior depends on whether slot was released + # Second flow run can proceed since slot was released ctx4 = await initialize_orchestration( session, "flow", *pending_transition, deployment_id=deployment.id, - client_version=client_version, ) async with contextlib.AsyncExitStack() as stack: ctx4 = await stack.enter_async_context( @@ -4540,24 +4455,14 @@ async def test_flow_run_cancellation( ) await ctx4.validate_proposed_state() - if expect_server_auto_release: - # Old client: Slot was released, so new run can proceed - assert ctx4.response_status == SetStateStatus.ACCEPT - else: - # New client: Slot wasn't released by server, so new run is blocked - assert ctx4.response_status == SetStateStatus.REJECT + # Slot was released by server, so new run can proceed + assert ctx4.response_status == SetStateStatus.ACCEPT - @pytest.mark.parametrize( - "client_version,expect_server_auto_release", - [("3.6.22", True), ("3.6.23", False)], - ) async def test_pending_running_completed_releases_concurrency_slot( self, session, initialize_orchestration, flow, - client_version, - expect_server_auto_release, ): deployment = await self.create_deployment_with_concurrency_limit( session, 1, flow @@ -4575,7 +4480,6 @@ async def test_pending_running_completed_releases_concurrency_slot( "flow", *pending_transition, deployment_id=deployment.id, - client_version=client_version, ) async with contextlib.AsyncExitStack() as stack: ctx1 = await stack.enter_async_context( @@ -4592,7 +4496,6 @@ async def test_pending_running_completed_releases_concurrency_slot( "flow", *running_transition, deployment_id=deployment.id, - client_version=client_version, ) async with contextlib.AsyncExitStack() as stack: ctx2 = await stack.enter_async_context( @@ -4610,7 +4513,6 @@ async def test_pending_running_completed_releases_concurrency_slot( "flow", *completed_transition, deployment_id=deployment.id, - client_version=client_version, ) async with contextlib.AsyncExitStack() as stack: ctx2 = await stack.enter_async_context( @@ -4618,13 +4520,8 @@ async def test_pending_running_completed_releases_concurrency_slot( ) await ctx2.validate_proposed_state() - # Check if server auto-released based on client version - if expect_server_auto_release: - # Old client: Server auto-releases the slot - await assert_deployment_concurrency_limit(session, deployment, 1, 0) - else: - # New client: Server does NOT auto-release (client responsibility) - await assert_deployment_concurrency_limit(session, deployment, 1, 1) + # Server always auto-releases the slot + await assert_deployment_concurrency_limit(session, deployment, 1, 0) async def test_error_handling( self, diff --git a/tests/test_flow_engine.py b/tests/test_flow_engine.py index ff0b90383928..4f9c9ea0db85 100644 --- a/tests/test_flow_engine.py +++ b/tests/test_flow_engine.py @@ -2683,7 +2683,7 @@ def foo(): run_flow(foo, flow_run) mock_maintain_concurrency_lease.assert_called_once_with( - ANY, 300, raise_on_lease_renewal_failure=True + ANY, 300, raise_on_lease_renewal_failure=True, should_stop=ANY ) async def test_lease_renewal_async( @@ -2718,7 +2718,7 @@ async def foo(): await run_flow(foo, flow_run) mock_maintain_concurrency_lease.assert_called_once_with( - ANY, 300, raise_on_lease_renewal_failure=True + ANY, 300, raise_on_lease_renewal_failure=True, should_stop=ANY ) From 06fc0ccb9626dcfd467688d39dbc96e6e8076a24 Mon Sep 17 00:00:00 2001 From: nas Date: Fri, 13 Mar 2026 11:40:19 +0100 Subject: [PATCH 3/5] revert changes to test_core_policy.py --- .../server/orchestration/test_core_policy.py | 80 +++++-------------- 1 file changed, 22 insertions(+), 58 deletions(-) diff --git a/tests/server/orchestration/test_core_policy.py b/tests/server/orchestration/test_core_policy.py index 0f98942a5762..d67e855ecd5d 100644 --- a/tests/server/orchestration/test_core_policy.py +++ b/tests/server/orchestration/test_core_policy.py @@ -3833,10 +3833,6 @@ async def test_release_concurrency_slots( initialize_orchestration, flow, ): - """ - Test that the server auto-releases deployment concurrency leases on terminal - state transitions. - """ deployment = await self.create_deployment_with_concurrency_limit( session, 1, flow ) @@ -3850,10 +3846,7 @@ async def test_release_concurrency_slots( # First run should be accepted ctx1 = await initialize_orchestration( - session, - "flow", - *pending_transition, - deployment_id=deployment.id, + session, "flow", *pending_transition, deployment_id=deployment.id ) async with contextlib.AsyncExitStack() as stack: @@ -3876,10 +3869,7 @@ async def test_release_concurrency_slots( # Second run should be delayed ctx2 = await initialize_orchestration( - session, - "flow", - *pending_transition, - deployment_id=deployment.id, + session, "flow", *pending_transition, deployment_id=deployment.id ) with mock.patch("prefect.server.orchestration.core_policy.now") as mock_now: @@ -3945,11 +3935,14 @@ async def test_release_concurrency_slots( ) await ctx1_completed.validate_proposed_state() - # Server always auto-releases the lease on terminal state transition + assert ( + ctx1_completed.validated_state.state_details.deployment_concurrency_lease_id + is None + ) lease_ids = await lease_storage.read_active_lease_ids() assert len(lease_ids) == 0 - # Second run can now acquire the slot immediately + # Now the second run should be accepted ctx2_retry = await initialize_orchestration( session, "flow", @@ -4026,10 +4019,7 @@ async def test_enqueue_collision_strategy( # First run should be accepted ctx1 = await initialize_orchestration( - session, - "flow", - *pending_transition, - deployment_id=deployment.id, + session, "flow", *pending_transition, deployment_id=deployment.id ) async with contextlib.AsyncExitStack() as stack: @@ -4042,10 +4032,7 @@ async def test_enqueue_collision_strategy( # Second run should be enqueued ctx2 = await initialize_orchestration( - session, - "flow", - *pending_transition, - deployment_id=deployment.id, + session, "flow", *pending_transition, deployment_id=deployment.id ) with mock.patch("prefect.server.orchestration.core_policy.now") as mock_now: @@ -4084,7 +4071,7 @@ async def test_enqueue_collision_strategy( ) await ctx1_completed.validate_proposed_state() - # Server always auto-releases; second run can proceed + # Now the second run should be accepted ctx2_retry = await initialize_orchestration( session, "flow", @@ -4099,7 +4086,6 @@ async def test_enqueue_collision_strategy( ) await ctx2_retry.validate_proposed_state() - # Server released the lease, second run can proceed assert ctx2_retry.response_status == SetStateStatus.ACCEPT async def test_uses_enqueue_collision_strategy_by_default( @@ -4396,10 +4382,7 @@ async def test_flow_run_cancellation( # Secure a concurrency slot ctx1 = await initialize_orchestration( - session, - "flow", - *pending_transition, - deployment_id=deployment.id, + session, "flow", *pending_transition, deployment_id=deployment.id ) async with contextlib.AsyncExitStack() as stack: ctx1 = await stack.enter_async_context( @@ -4410,10 +4393,7 @@ async def test_flow_run_cancellation( # Move to Cancelling state (should still hold the slot) ctx2 = await initialize_orchestration( - session, - "flow", - *cancelling_transition, - deployment_id=deployment.id, + session, "flow", *cancelling_transition, deployment_id=deployment.id ) async with contextlib.AsyncExitStack() as stack: ctx2 = await stack.enter_async_context( @@ -4426,12 +4406,9 @@ async def test_flow_run_cancellation( session, deployment, 1, 1 ) # Concurrency slot still held - # Move to Cancelled state + # Move to Cancelled state (should release the slot) ctx3 = await initialize_orchestration( - session, - "flow", - *cancelled_transition, - deployment_id=deployment.id, + session, "flow", *cancelled_transition, deployment_id=deployment.id ) async with contextlib.AsyncExitStack() as stack: ctx3 = await stack.enter_async_context( @@ -4439,25 +4416,21 @@ async def test_flow_run_cancellation( ) await ctx3.validate_proposed_state() - # Server always auto-releases the slot await assert_deployment_concurrency_limit(session, deployment, 1, 0) - # Second flow run can proceed since slot was released + # Verify that the concurrency slot can be secured again ctx4 = await initialize_orchestration( - session, - "flow", - *pending_transition, - deployment_id=deployment.id, + session, "flow", *pending_transition, deployment_id=deployment.id ) async with contextlib.AsyncExitStack() as stack: ctx4 = await stack.enter_async_context( SecureFlowConcurrencySlots(ctx4, *pending_transition) ) await ctx4.validate_proposed_state() - - # Slot was released by server, so new run can proceed assert ctx4.response_status == SetStateStatus.ACCEPT + await assert_deployment_concurrency_limit(session, deployment, 1, 1) + async def test_pending_running_completed_releases_concurrency_slot( self, session, @@ -4476,10 +4449,7 @@ async def test_pending_running_completed_releases_concurrency_slot( # Secure a concurrency slot ctx1 = await initialize_orchestration( - session, - "flow", - *pending_transition, - deployment_id=deployment.id, + session, "flow", *pending_transition, deployment_id=deployment.id ) async with contextlib.AsyncExitStack() as stack: ctx1 = await stack.enter_async_context( @@ -4492,10 +4462,7 @@ async def test_pending_running_completed_releases_concurrency_slot( # Move to running state ctx2 = await initialize_orchestration( - session, - "flow", - *running_transition, - deployment_id=deployment.id, + session, "flow", *running_transition, deployment_id=deployment.id ) async with contextlib.AsyncExitStack() as stack: ctx2 = await stack.enter_async_context( @@ -4509,10 +4476,7 @@ async def test_pending_running_completed_releases_concurrency_slot( # Now move to completed ctx2 = await initialize_orchestration( - session, - "flow", - *completed_transition, - deployment_id=deployment.id, + session, "flow", *completed_transition, deployment_id=deployment.id ) async with contextlib.AsyncExitStack() as stack: ctx2 = await stack.enter_async_context( @@ -4520,7 +4484,7 @@ async def test_pending_running_completed_releases_concurrency_slot( ) await ctx2.validate_proposed_state() - # Server always auto-releases the slot + # Slot is released await assert_deployment_concurrency_limit(session, deployment, 1, 0) async def test_error_handling( From 901f53eb279636b96214a138a7347496479171d9 Mon Sep 17 00:00:00 2001 From: nas Date: Fri, 13 Mar 2026 16:31:36 +0100 Subject: [PATCH 4/5] track flow executed flag in should_stop, structural simplifications to renewal and retries --- src/prefect/_internal/retries.py | 8 ++++ src/prefect/concurrency/_leases.py | 67 ++++++++++-------------------- src/prefect/flow_engine.py | 22 ++++++---- tests/_internal/test_retries.py | 49 ++++++++++++++++++++++ tests/concurrency/test_leases.py | 9 ++-- 5 files changed, 98 insertions(+), 57 deletions(-) diff --git a/src/prefect/_internal/retries.py b/src/prefect/_internal/retries.py index 552b40b9c4a7..4e349aa266b2 100644 --- a/src/prefect/_internal/retries.py +++ b/src/prefect/_internal/retries.py @@ -28,6 +28,7 @@ def retry_async_fn( max_delay: float = 10, retry_on_exceptions: tuple[type[Exception], ...] = (Exception,), operation_name: Optional[str] = None, + should_not_retry: Callable[[Exception], bool] = lambda e: False, ) -> Callable[ [Callable[P, Coroutine[Any, Any, R]]], Callable[P, Coroutine[Any, Any, R]] ]: @@ -44,6 +45,11 @@ def retry_async_fn( retrying on all exceptions. operation_name: Optional name to use for logging the operation instead of the function name. If None, uses the function name. + should_not_retry: An optional callable that takes the caught exception and + returns True if retries should be skipped immediately. Useful for + short-circuiting retries on non-transient errors (e.g. HTTP 410 Gone) + where retrying will never succeed. When it returns True the exception + is re-raised immediately without any backoff or retry logging. """ def decorator( @@ -56,6 +62,8 @@ async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R: try: return await func(*args, **kwargs) except retry_on_exceptions as e: + if should_not_retry(e): + raise if attempt == max_attempts - 1: logger.exception( f"Function {name!r} failed after {max_attempts} attempts" diff --git a/src/prefect/concurrency/_leases.py b/src/prefect/concurrency/_leases.py index b408ddbcec91..cbff41966933 100644 --- a/src/prefect/concurrency/_leases.py +++ b/src/prefect/concurrency/_leases.py @@ -17,16 +17,6 @@ from prefect.logging.loggers import get_logger, get_run_logger -class _LeaseGoneError(BaseException): - """Raised when the server returns 410 Gone during lease renewal. - - Intentionally inherits from BaseException (not Exception) so that - @retry_async_fn(retry_on_exceptions=(Exception,)) never retries it. - A 410 means the lease was revoked or expired server-side and is not - a transient condition — retrying will never succeed. - """ - - async def _lease_renewal_loop( lease_id: UUID, lease_duration: float, @@ -45,18 +35,17 @@ async def _lease_renewal_loop( """ async with get_client() as client: - @retry_async_fn(max_attempts=3, operation_name="concurrency lease renewal") + @retry_async_fn( + max_attempts=3, + operation_name="concurrency lease renewal", + should_not_retry=lambda e: ( + isinstance(e, httpx.HTTPStatusError) and e.response.status_code == 410 + ), + ) async def renew() -> None: - try: - await client.renew_concurrency_lease( - lease_id=lease_id, lease_duration=lease_duration - ) - except httpx.HTTPStatusError as e: - if e.response.status_code == 410: - raise _LeaseGoneError( - f"Concurrency lease {lease_id} has expired or been revoked by the server." - ) from e - raise + await client.renew_concurrency_lease( + lease_id=lease_id, lease_duration=lease_duration + ) while True: # Exit cleanly if the caller signals that the flow is done. @@ -106,24 +95,18 @@ def handle_lease_renewal_failure(future: concurrent.futures.Future[None]): return exc = future.exception() if exc: - # If the caller signals that the flow is already done, a renewal - # failure is expected (the server released the lease during the - # terminal state transition). Suppress it rather than crashing. + try: + logger = get_run_logger() + except Exception: + logger = get_logger("concurrency") + if should_stop(): - try: - logger = get_run_logger() - except Exception: - logger = get_logger("concurrency") logger.debug( "Concurrency lease renewal failed after flow reached terminal state - this is expected.", exc_info=(type(exc), exc, exc.__traceback__), ) return - try: - # Use a run logger if available - logger = get_run_logger() - except Exception: - logger = get_logger("concurrency") + if raise_on_lease_renewal_failure: logger.error( "Concurrency lease renewal failed - slots are no longer reserved. Terminating execution to prevent over-allocation.", @@ -183,24 +166,18 @@ def handle_lease_renewal_failure(task: asyncio.Task[None]): return exc = task.exception() if exc: - # If the caller signals that the flow is already done, a renewal - # failure is expected (the server released the lease during the - # terminal state transition). Suppress it rather than crashing. + try: + logger = get_run_logger() + except Exception: + logger = get_logger("concurrency") + if should_stop(): - try: - logger = get_run_logger() - except Exception: - logger = get_logger("concurrency") logger.debug( "Concurrency lease renewal failed after flow reached terminal state - this is expected.", exc_info=(type(exc), exc, exc.__traceback__), ) return - try: - # Use a run logger if available - logger = get_run_logger() - except Exception: - logger = get_logger("concurrency") + if raise_on_lease_renewal_failure: logger.error( "Concurrency lease renewal failed - slots are no longer reserved. Terminating execution to prevent over-allocation.", diff --git a/src/prefect/flow_engine.py b/src/prefect/flow_engine.py index dc5dc148c959..9e4b5ec1971b 100644 --- a/src/prefect/flow_engine.py +++ b/src/prefect/flow_engine.py @@ -836,10 +836,13 @@ def setup_run_context(self, client: Optional[SyncPrefectClient] = None): lease_id, 300, raise_on_lease_renewal_failure=True, - should_stop=lambda: bool( - self.flow_run - and self.flow_run.state - and self.flow_run.state.is_final() + should_stop=lambda: ( + self._flow_executed + or bool( + self.flow_run + and self.flow_run.state + and self.flow_run.state.is_final() + ) ), ) ) @@ -1449,10 +1452,13 @@ async def setup_run_context(self, client: Optional[PrefectClient] = None): lease_id, 300, raise_on_lease_renewal_failure=True, - should_stop=lambda: bool( - self.flow_run - and self.flow_run.state - and self.flow_run.state.is_final() + should_stop=lambda: ( + self._flow_executed + or bool( + self.flow_run + and self.flow_run.state + and self.flow_run.state.is_final() + ) ), ) ) diff --git a/tests/_internal/test_retries.py b/tests/_internal/test_retries.py index 0d4376baa6ab..2dc15c418f52 100644 --- a/tests/_internal/test_retries.py +++ b/tests/_internal/test_retries.py @@ -110,3 +110,52 @@ async def eventual_success_func(): assert result == "Success" assert mock_func.call_count == 3 assert mock_sleep.call_count == 2 + + async def test_should_not_retry_short_circuits(self, mock_sleep): + """When should_not_retry returns True, raises immediately without retrying.""" + mock_func = AsyncMock(side_effect=ValueError("no retry")) + + @retry_async_fn(max_attempts=3, should_not_retry=lambda e: True) + async def fail_func(): + await mock_func() + + with pytest.raises(ValueError, match="no retry"): + await fail_func() + + assert mock_func.call_count == 1 + assert mock_sleep.call_count == 0 + + async def test_should_not_retry_false_does_not_affect_retries(self, mock_sleep): + """When should_not_retry returns False, normal retry behavior is preserved.""" + mock_func = AsyncMock(side_effect=ValueError("retry me")) + + @retry_async_fn(max_attempts=3, should_not_retry=lambda e: False) + async def fail_func(): + await mock_func() + + with pytest.raises(ValueError, match="retry me"): + await fail_func() + + assert mock_func.call_count == 3 + assert mock_sleep.call_count == 2 + + async def test_should_not_retry_only_short_circuits_matching_exceptions( + self, mock_sleep + ): + """should_not_retry inspects the exception — only short-circuits when it returns True.""" + mock_func = AsyncMock( + side_effect=[ValueError("retry this"), ValueError("stop here")] + ) + + @retry_async_fn( + max_attempts=5, + should_not_retry=lambda e: "stop" in str(e), + ) + async def mixed_func(): + await mock_func() + + with pytest.raises(ValueError, match="stop here"): + await mixed_func() + + assert mock_func.call_count == 2 # retried once, then short-circuited + assert mock_sleep.call_count == 1 diff --git a/tests/concurrency/test_leases.py b/tests/concurrency/test_leases.py index 2ffad8c54800..0bcfe741981f 100644 --- a/tests/concurrency/test_leases.py +++ b/tests/concurrency/test_leases.py @@ -5,7 +5,7 @@ import httpx import pytest -from prefect.concurrency._leases import _lease_renewal_loop, _LeaseGoneError +from prefect.concurrency._leases import _lease_renewal_loop def _make_http_status_error(status_code: int) -> httpx.HTTPStatusError: @@ -68,8 +68,8 @@ async def test_lease_renewal_loop_raises_after_max_retry_attempts(): assert mock_client.renew_concurrency_lease.call_count == 3 -async def test_lease_renewal_loop_raises_lease_gone_on_410(): - """A 410 response raises _LeaseGoneError immediately without any retries.""" +async def test_lease_renewal_loop_does_not_retry_on_410(): + """A 410 response raises httpx.HTTPStatusError immediately without any retries.""" mock_client = mock.AsyncMock() mock_client.renew_concurrency_lease.side_effect = _make_http_status_error(410) @@ -78,9 +78,10 @@ async def test_lease_renewal_loop_raises_lease_gone_on_410(): mock.patch("asyncio.sleep", new_callable=mock.AsyncMock), ): mock_get_client.return_value.__aenter__.return_value = mock_client - with pytest.raises(_LeaseGoneError): + with pytest.raises(httpx.HTTPStatusError) as exc_info: await _lease_renewal_loop(lease_id=uuid4(), lease_duration=10.0) + assert exc_info.value.response.status_code == 410 # No retries — called exactly once assert mock_client.renew_concurrency_lease.call_count == 1 From 92217bd31e216ff2e84fb106a00fc69239370e43 Mon Sep 17 00:00:00 2001 From: nas Date: Fri, 13 Mar 2026 16:41:45 +0100 Subject: [PATCH 5/5] dont catch base exception as its not needed anymore --- src/prefect/concurrency/_leases.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/prefect/concurrency/_leases.py b/src/prefect/concurrency/_leases.py index cbff41966933..31c2febbed80 100644 --- a/src/prefect/concurrency/_leases.py +++ b/src/prefect/concurrency/_leases.py @@ -204,6 +204,6 @@ def handle_lease_renewal_failure(task: asyncio.Task[None]): lease_renewal_task.cancel() try: await lease_renewal_task - except (asyncio.CancelledError, Exception, BaseException): + except (asyncio.CancelledError, Exception): # Handling for errors will be done in the callback pass