Skip to content

Commit 4421582

Browse files
authored
fix(engine): close concurrent preempt race in target-state ownership transfer (#1994)
When two components race to own the same target state, the prior fix relied on tokio scheduling to serialize their `pre_commit` -> `sink_apply` -> `commit` sequences. That holds for LMDB (microsecond ops) but breaks under PG latency, where the loser's `delete` lands after the winner's `upsert` and the target state is lost. Add a `pending_process_token: Option<u128>` to `TargetStateInfoItem`, written by `pre_commit` whenever it queues a sink action and cleared by `commit_in_txn`'s retention pass. A detection sub-pass at the top of `pre_commit` peeks the token on every preempt-source item: live (same process token) -> return `PendingRetry` with the unconsumed declared map so `submit()` can re-invoke after a backoff; dead (crashed prior process) -> force `prev_may_be_missing=true` on reconcile. The detection sub-pass also caches old-owner `tracking_info` bytes into a per-call `HashMap<StablePath, Vec<u8>>` shared with the Phase 1 preempt branch -- each owner is read once, modified in place across multiple preempts, and emitted as a single `DeferredWrite` at the end of Phase 1. `set_provider_generation` (OnceLock-backed, can't run twice) is deferred until after the last possible PendingRetry exit. On `sink_apply` / `commit_in_txn` failure, `rollback_pending_tokens` re-reads tracking_info and clears every token matching this process's token; retried indefinitely with logged backoff until success. The process-exits-mid-rollback case is covered by the dead-token recovery path on next startup. `contained_target_state_paths` wrapped in `Arc` to avoid full HashSet rehash on every retry iteration. See `specs/target_state_ownership_transfer/concurrent_preempt_race_fix.md`.
1 parent 92cf6c3 commit 4421582

3 files changed

Lines changed: 375 additions & 77 deletions

File tree

rust/core/src/engine/environment.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,12 @@ struct EnvironmentInner<Prof: EngineProfile> {
2626
/// See `specs/memo_validation/plan.md` → "Extension: state validation
2727
/// for tracked context values".
2828
context_initial_states: RwLock<HashMap<Fingerprint, Vec<Prof::FunctionData>>>,
29+
/// Per-process liveness token. Used by `pre_commit` to distinguish a
30+
/// pending tracking-info entry written by this process (live → caller
31+
/// backs off and retries) from one left behind by a crashed prior
32+
/// process (dead → take the recovery path).
33+
/// See `specs/target_state_ownership_transfer/concurrent_preempt_race_fix.md`.
34+
process_token: u128,
2935
}
3036

3137
#[derive(Clone)]
@@ -47,10 +53,16 @@ impl<Prof: EngineProfile> Environment<Prof> {
4753
host_runtime_ctx,
4854
logic_set: RwLock::new(HashSet::new()),
4955
context_initial_states: RwLock::new(HashMap::new()),
56+
process_token: uuid::Uuid::new_v4().as_u128(),
5057
});
5158
Ok(Self { inner: state })
5259
}
5360

61+
/// Liveness token for the current process. See `EnvironmentInner::process_token`.
62+
pub fn process_token(&self) -> u128 {
63+
self.inner.process_token
64+
}
65+
5466
pub fn storage(&self) -> &Storage {
5567
&self.inner.storage
5668
}

0 commit comments

Comments
 (0)