Fix deployment concurrency slot lease release race condition#21014
Fix deployment concurrency slot lease release race condition#21014ubiquitousbyte wants to merge 2 commits intoPrefectHQ:mainfrom
Conversation
Fixes critical race condition where deployment concurrency lease_id is read from the wrong state object after propose_state() overwrites the flow run's state, causing slots to remain occupied until TTL expires. Also prevents flows from being incorrectly marked as Crashed when BaseExceptions occur during post-execution state transitions. ## Root Cause ### Race Condition (Issue PrefectHQ#17415) In FlowRunEngine.set_state() (both sync and async): 1. Flow in RUNNING state with lease_id in state_details 2. propose_state(Completed()) returns new COMPLETED state 3. propose_state() OVERWRITES self.flow_run.state with new state 4. Code reads lease_id from self.flow_run.state (now COMPLETED with lease_id=null) 5. release_concurrency_slots_with_lease() never called 6. Slot remains occupied until 300s TTL expiration 7. Other flows stuck in AwaitingConcurrencySlot state ### False Crash Detection (Issue PrefectHQ#19068) When flows complete successfully but encounter infrastructure issues: 1. User code completes successfully 2. set_state(Completed()) makes API call 3. Lease renewal or API timeout raises BaseException 4. Existing code: "BaseException + not final state = crash!" 5. Flow incorrectly marked as CRASHED despite successful execution ## Changes ### Client-Side Fix (flow_engine.py) **Lease Release Timing Fix:** - Capture lease_id from current state BEFORE calling propose_state() - Use saved lease_id value for release API call - Applied to both sync (lines 507-545) and async (lines 1145-1180) **False Crash Prevention:** - Add `_flow_executed` flag to track when user code finishes - Set flag in handle_success() after user code completes - Update BaseException handler to check flag before marking as crashed - Prevents infrastructure issues from masking successful executions ### Server-Side Fixes (core_policy.py) - Fix SecureFlowConcurrencySlots.cleanup() to only decrement if lease exists - Restore ReleaseFlowConcurrencySlots.after_transition() with version checking ### Backwards Compatibility (dependencies.py + core_policy.py) - Added MIN_CLIENT_VERSION_FOR_CLIENT_SIDE_LEASE_RELEASE = Version("3.6.23") - Old clients (< 3.6.23 or None): Server auto-releases (prevents orphans) - New clients (>= 3.6.23): Client-side release only (avoids race) - Defaults to old behavior when version unknown (safe default) ## Testing - All 70 existing TestFlowConcurrencyLimits tests pass - Parameterized 4 key tests for old/new client behaviors - Added lease renewal failure resilience tests Closes PrefectHQ#17415 Closes PrefectHQ#19068 Related: PrefectHQ#18942
|
@zzstoatzz, @desertaxle, can you guys please have a look? |
desertaxle
left a comment
There was a problem hiding this comment.
Thanks for the PR and the great write-up, @ubiquitousbyte!
I think there's a simpler solution available that doesn't involve any server-side changes. We had a similar race condition with flow run heartbeats, which we solved by giving the heartbeat loop a reference to the engine so it could check the current engine state before sending a heartbeat (you can see how that works in send_heartbeats_async in src/prefect/flow_engine.py). I think a similar pattern could work great for concurrency slot renewal. It might require adding a callback check to maintain_concurrency_lease and amaintain_concurrency_lease, but it seems worth fixing this race condition.
Let me know what you think!
Enables workaround for #18894
Closes #19068
Context
Last year, I introduced Prefect as the primary orchestration tool for data pipelines @ the company I currently work in - Remerge. We are self-hosting Prefect on bare-metal machines and are using the
dockerworker type to schedule our flows.Our setup has been a pretty lean MVP - 1 Prefect Server & 2 Prefect Workers with a PostgreSQL database hosted in the cloud. We are hosting 50 deployments owned by multiple engineering teams and adoption has been good, so we want to scale out our deployment to multiple Prefect Servers and do some upgrades. A couple of months ago, we bumped our Prefect version from
3.0.3to3.4.17and we started encountering some regressions.The most notable issue for us has been #18894. We've had to manually deduplicate datasets produced by our flows and establish workaround processes for downstream teams to follow. This is quite time-consuming work for our engineering department and I'd like to put an end to it. As the person who argued for Prefect over Airflow to begin with, I feel responsible for making this happen. Hence why the PR exists.
I naively thought that setting up a concurrency limit of 1 @ the deployment-level would prevent the duplication issue. So I set up a multi-server Prefect cluster (with Redis for lease storage) in https://github.com/ubiquitousbyte/prefect-homelab for experimentation with a very basic deployment for a flow that prints
Hello, World!to the screen with a concurrency limit set to 1. Then I got hit by #19068 with the primary difference that the failure of renewing the concurrency lease was not triggered by a timeout.Concurrency lease renewal bug
A similar issue has been brought up in #19068.
This is an explanation of what I see happening. The changes introduced here have completely eliminated the issue in my multi-server setup.
I believe there is a race condition between two concurrent operations:
Timeline of the race:
set_state(Completed), which triggers an API call to the server.RUNNINGtoCOMPLETED.ReleaseFlowConcurrencySlots.after_transition()function fires and revokes the lease.CRASHED, even though the underlying infrastructure executed it successfully.How is this fixed?
Instead of relying on server auto-release, the client now explicitly releases the lease after successful state transition completes. This is a backwards-incompatible behavior change, so I've added server-side version detection:
With this change, the renewal loop exits cleanly in its
finallyblock before the client releases the lease — eliminating the race condition. With this, I can use a deployment concurrency limit to avoid duplicate schedules.One interesting gotcha with the new implementation:
deployment_concurrency_lease_idis only present in RUNNING statesRUNNING → COMPLETED, the server returns aCOMPLETEDstate without thelease_id(it's not relevant to terminal states)propose_state()first, the code overwrites self.flow_run.state with the newCOMPLETEDstatelease_idfrom the state after this point always getsNoneand as a result, concurrency slots would remain occupied until the TTL expires.lease_idfrom the current state before callingpropose_state()This PR does not fix the root cause of schedule duplication. That's a separate scheduling bug that needs investigation. This PR provides a working defense mechanism against duplicate execution with deployment concurrency limits.