Skip to content

Commit d349a47

Browse files
authored
refactor(engine): move pending_process_token to component level + close rollback gap (#1996)
Moves `pending_process_token` from `TargetStateInfoItem` to `StablePathEntryTrackingInfo` and replaces the per-item dead-token detection with the existing multi-state signal (`item.states.len() > 1`). Storage win: during a pre_commit's write-to-commit window we now write one token per *component* instead of one per *modified item* (~22 bytes each at the msgpack level). At rest, both representations are zero bytes -- the field is `skip_serializing_if = "Option::is_none"`. Correctness win: the previous design lost its dead-token signal after `rollback_pending_tokens` cleared the per-item field. If lifecycle 1 left items as `[(old, new)]`, sink_apply failed, rollback cleared tokens -- but lifecycle 2 declaring the same value would compute `prev_may_be_missing = false` (no token, no deleted state), and reconcile would short-circuit. The sink would stay at `old` while tracking thought it was `new`. The new design uses multi-state itself as the force-missing signal: `prev_may_be_missing = full_reprocess || schema_mismatch || item.is_pending()`. This subsumes both crashed-prior-process (token from a dead process, multi-state on disk) and rolled-back (no token, multi-state still on disk) cases. Detection sub-pass simplifies to: read old-owner tracking_info; if `component.pending_process_token == self AND item.is_pending()`, abort with PendingRetry. The per-item filter prevents over-conservative back-off when C1's pre_commit modified a different item than C2 wants to preempt. The own-tracking detection branch is gone -- any dead/ rolled-back state in our own tracking is picked up uniformly through `is_pending()` in the main pass. Also unifies the two msgpack deserializations of own tracking_info into one -- detection and the main pass share the same `let mut tracking_info` binding. Spec docs updated: specs/core/internal_states.md sections 2 (schema), 3.2 (pre_commit), and 5.2 (sink exceptions).
1 parent 1f5befa commit d349a47

2 files changed

Lines changed: 92 additions & 81 deletions

File tree

rust/core/src/engine/execution.rs

Lines changed: 65 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -344,9 +344,10 @@ impl<Prof: EngineProfile> Committer<Prof> {
344344
}
345345
// Prune entries with empty states and collect their paths for
346346
// inverted tracking cleanup (deferred until tracking_info is dropped).
347-
// Surviving entries get their `pending_process_token` cleared —
348-
// the lifecycle (pre_commit → sink_apply → commit) is succeeding,
349-
// so any token written by pre_commit is no longer "pending".
347+
// The component-level `pending_process_token` is cleared below
348+
// (after this retention pass) — the lifecycle (pre_commit →
349+
// sink_apply → commit) is succeeding, so any token written by
350+
// pre_commit is no longer "pending".
350351
let mut pruned_paths: HashSet<TargetStatePath> = HashSet::new();
351352
tracking_info
352353
.target_state_items
@@ -355,10 +356,10 @@ impl<Prof: EngineProfile> Committer<Prof> {
355356
pruned_paths.insert(path_with_pid.target_state_path.clone());
356357
false
357358
} else {
358-
item.pending_process_token = None;
359359
true
360360
}
361361
});
362+
tracking_info.pending_process_token = None;
362363
// Don't delete inverted tracking if a surviving entry shares the same
363364
// target_state_path (can happen when provider_id changed — old entry
364365
// pruned, new entry survives under different provider_id).
@@ -793,37 +794,36 @@ async fn pre_commit<Prof: EngineProfile>(
793794

794795
let mut id_reservation = IdReservation::new(&TARGET_ID_KEY);
795796
let tracking_info_bytes = app_store.read_tracking_info(wtxn, stable_path).await?;
797+
let mut tracking_info: Option<db_schema::StablePathEntryTrackingInfo<'_>> = tracking_info_bytes
798+
.as_deref()
799+
.map(from_msgpack_slice)
800+
.transpose()?;
796801

797802
// Detection sub-pass — runs before any `TargetStateValue` is consumed by
798803
// reconcile, so a `PendingRetry` return leaves the input `declared_target_states`
799-
// intact and the surrounding txn write-free for the retry. Two things happen:
800-
// 1. Own tracking_info items: a non-None token must be dead, since only
801-
// one lifecycle per component_path runs at a time. Flag the path for
802-
// `prev_may_be_missing = true` on the main pass.
803-
// 2. Each declared target_state_path that would go through the preempt
804-
// branch: read the old owner's tracking_info; peek the matching
805-
// item's token. Live → PendingRetry. Dead → flag for force-missing.
804+
// intact and the surrounding txn write-free for the retry.
805+
//
806+
// We're only looking for one thing: a *live* in-flight pre_commit from
807+
// this process on an old owner whose item we want to preempt. The signal
808+
// is `old.tracking.pending_process_token == self AND item.is_pending()`
809+
// — the component-level token says the lifecycle is in flight, the
810+
// per-item multi-state signal filters to just the items that lifecycle
811+
// actually touched. Without the per-item filter, C2 would back off
812+
// preempting item I from C1 even when C1's pre_commit only modified
813+
// item J — over-conservative.
814+
//
815+
// Crashed-prior-process and rolled-back states are *not* detected here.
816+
// Both leave multi-state items on disk (a token from a dead process, or
817+
// no token after `rollback_pending_tokens` ran), and the main pass picks
818+
// them up uniformly via `prev_item.is_pending()` → force
819+
// `prev_may_be_missing = true` on reconcile.
806820
//
807821
// Old-owner tracking_info bytes read here are cached and reused by the
808-
// Phase 1 preempt branch — the main pass deserializes, modifies, and
809-
// re-serializes into the same cache slot, so multiple preempts from the
810-
// same old owner accumulate in memory and produce a single deferred
811-
// write at the end of pre_commit.
812-
let mut dead_token_paths: HashSet<TargetStatePath> = HashSet::new();
822+
// Phase 1 preempt branch: deserialize, modify, re-serialize into the
823+
// same slot, emit one deferred write per modified owner at the end.
813824
let mut old_tracking_cache: HashMap<StablePath, Vec<u8>> = HashMap::new();
814825
let mut pending_retry = false;
815826
{
816-
let own_tracking: Option<db_schema::StablePathEntryTrackingInfo<'_>> = tracking_info_bytes
817-
.as_deref()
818-
.map(from_msgpack_slice)
819-
.transpose()?;
820-
if let Some(own) = &own_tracking {
821-
for (path_with_pid, item) in &own.target_state_items {
822-
if item.pending_process_token.is_some() {
823-
dead_token_paths.insert(path_with_pid.target_state_path.clone());
824-
}
825-
}
826-
}
827827
// Materialize keys into an owned Vec so the iterator doesn't borrow
828828
// `declared_target_states` across the awaits below — the map's
829829
// values (`TargetStateValue`) are `!Sync`, which would otherwise
@@ -837,7 +837,7 @@ async fn pre_commit<Prof: EngineProfile>(
837837
target_state_path: target_state_path.clone(),
838838
provider_id: parent_provider_gen.map(|g| g.provider_id),
839839
};
840-
if own_tracking
840+
if tracking_info
841841
.as_ref()
842842
.is_some_and(|t| t.target_state_items.contains_key(&lookup_key))
843843
{
@@ -863,16 +863,12 @@ async fn pre_commit<Prof: EngineProfile>(
863863
}
864864
let cached = &old_tracking_cache[&owner_info.component_path];
865865
let old: db_schema::StablePathEntryTrackingInfo<'_> = from_msgpack_slice(cached)?;
866-
if let Some(item) = old.target_state_items.get(&lookup_key) {
867-
match item.pending_process_token {
868-
Some(t) if t == process_token => {
866+
if old.pending_process_token == Some(process_token) {
867+
if let Some(item) = old.target_state_items.get(&lookup_key) {
868+
if item.is_pending() {
869869
pending_retry = true;
870870
break;
871871
}
872-
Some(_) => {
873-
dead_token_paths.insert(target_state_path);
874-
}
875-
None => {}
876872
}
877873
}
878874
}
@@ -883,10 +879,6 @@ async fn pre_commit<Prof: EngineProfile>(
883879
});
884880
}
885881
let mut modified_old_owners: HashSet<StablePath> = HashSet::new();
886-
let mut tracking_info: Option<db_schema::StablePathEntryTrackingInfo<'_>> = tracking_info_bytes
887-
.as_deref()
888-
.map(from_msgpack_slice)
889-
.transpose()?;
890882
// Deferred DB writes that will be flushed after tracking_info is dropped,
891883
// since tracking_info borrows from wtxn and prevents mutable DB operations.
892884
let mut deferred_writes: Vec<DeferredWrite> = Vec::new();
@@ -1002,19 +994,18 @@ async fn pre_commit<Prof: EngineProfile>(
1002994
};
1003995

1004996
// Compute prev_states and prev_may_be_missing uniformly from prev_item.
1005-
// A dead-token flag on this path (from the detection sub-pass) means
1006-
// the prior lifecycle that wrote prev_states crashed before its
1007-
// sink_apply / commit could finish — so the sink may not reflect
1008-
// what's tracked. Force prev_may_be_missing = true to reflect that.
997+
// `prev_item.is_pending()` (multi-state) means the prior lifecycle's
998+
// sink_apply / commit didn't finish — could be a crash on a different
999+
// process or a `rollback_pending_tokens` after a sink_apply failure
1000+
// here. In either case the sink may not reflect what's tracked, so
1001+
// force `prev_may_be_missing = true`.
10091002
let (prev_states, prev_may_be_missing) = if let Some(ref prev_item) = prev_item {
10101003
let schema_version_mismatch = match parent_provider_gen {
10111004
Some(pg) => prev_item.provider_schema_version != pg.provider_schema_version,
10121005
None => false,
10131006
};
1014-
let prev_may_be_missing = full_reprocess
1015-
|| schema_version_mismatch
1016-
|| dead_token_paths.contains(&target_state_path)
1017-
|| prev_item.states.iter().any(|(_, s)| s.is_deleted());
1007+
let prev_may_be_missing =
1008+
full_reprocess || schema_version_mismatch || prev_item.is_pending();
10181009
let prev_states = prev_item
10191010
.states
10201011
.iter()
@@ -1091,7 +1082,6 @@ async fn pre_commit<Prof: EngineProfile>(
10911082
None => db_schema::TargetStateInfoItemState::Deleted,
10921083
},
10931084
));
1094-
item.pending_process_token = Some(process_token);
10951085
} else if let Some(new_state) = new_state_bytes {
10961086
// Insert new item.
10971087
prev_item = Some(db_schema::TargetStateInfoItem {
@@ -1107,7 +1097,6 @@ async fn pre_commit<Prof: EngineProfile>(
11071097
],
11081098
provider_schema_version: 0,
11091099
provider_generation,
1110-
pending_process_token: Some(process_token),
11111100
});
11121101
}
11131102
} else if let Some(item) = &mut prev_item {
@@ -1178,8 +1167,7 @@ async fn pre_commit<Prof: EngineProfile>(
11781167
.map(|s_bytes| Prof::TargetStateTrackingRecord::from_bytes(s_bytes))
11791168
.collect::<Result<Vec<_>>>()?;
11801169

1181-
let prev_may_be_missing = prev_may_be_missing
1182-
|| dead_token_paths.contains(&target_state_path_with_pid.target_state_path);
1170+
let prev_may_be_missing = prev_may_be_missing || item.is_pending();
11831171
let recon_output = target_states_provider
11841172
.handler()
11851173
.ok_or_else(|| {
@@ -1206,7 +1194,6 @@ async fn pre_commit<Prof: EngineProfile>(
12061194
None => db_schema::TargetStateInfoItemState::Deleted,
12071195
},
12081196
));
1209-
item.pending_process_token = Some(process_token);
12101197
} else {
12111198
for (version, _) in item.states.iter_mut() {
12121199
*version = curr_version;
@@ -1219,6 +1206,17 @@ async fn pre_commit<Prof: EngineProfile>(
12191206
tracking_info.target_state_items.insert(path_with_pid, item);
12201207
}
12211208

1209+
// Mark the component as in-flight if we queued any sink action; else
1210+
// clear the slot (no-op if it was already None, but also wipes a stale
1211+
// token from a prior crashed lifecycle now that the current pre_commit
1212+
// has rewritten the items). On success this is cleared by
1213+
// `commit_in_txn`; on sink/commit failure, `rollback_pending_tokens`.
1214+
tracking_info.pending_process_token = if actions_by_sinks.is_empty() {
1215+
None
1216+
} else {
1217+
Some(process_token)
1218+
};
1219+
12221220
let data_bytes = rmp_serde::to_vec_named(&tracking_info)?;
12231221
drop(tracking_info); // Release borrow before mutable operations.
12241222
app_store
@@ -1479,17 +1477,22 @@ pub(crate) async fn submit<Prof: EngineProfile>(
14791477
})
14801478
}
14811479

1482-
/// Clear every `pending_process_token` on `comp_ctx`'s tracking_info that
1483-
/// matches the current process's token. Called when pre_commit succeeded but
1484-
/// the subsequent sink_apply / commit failed: without this, the tokens
1485-
/// pre_commit wrote would deadlock any future pre_commit in this process that
1486-
/// touches an overlapping path (live-token branch in the detection sub-pass).
1480+
/// Clear `comp_ctx`'s tracking_info `pending_process_token` if it matches
1481+
/// the current process's token. Called when pre_commit succeeded but the
1482+
/// subsequent sink_apply / commit failed: without this, the token pre_commit
1483+
/// wrote would deadlock any future pre_commit in this process that touches
1484+
/// an overlapping path (live-token branch in the detection sub-pass).
1485+
///
1486+
/// Items the failed pre_commit modified retain their multi-state shape on
1487+
/// disk; the next pre_commit's main pass picks them up via
1488+
/// `prev_item.is_pending()` → force `prev_may_be_missing = true`, so the
1489+
/// sink-tracking divergence the failure may have caused gets re-reconciled.
14871490
///
14881491
/// Retried indefinitely with exponential backoff — every failure is logged
14891492
/// but the function does not return until the cleanup succeeds. If the
1490-
/// process exits while this is still retrying, the tokens become "dead" from
1491-
/// the next process's perspective and the dead-token recovery branch in
1492-
/// pre_commit's detection sub-pass takes over.
1493+
/// process exits while this is still retrying, the remaining multi-state
1494+
/// items still flag themselves to the next process via the same
1495+
/// `is_pending()` check.
14931496
async fn rollback_pending_tokens<Prof: EngineProfile>(
14941497
comp_ctx: &ComponentProcessorContext<Prof>,
14951498
process_token: u128,
@@ -1509,16 +1512,10 @@ async fn rollback_pending_tokens<Prof: EngineProfile>(
15091512
let encoded = {
15101513
let mut tracking_info: db_schema::StablePathEntryTrackingInfo<'_> =
15111514
from_msgpack_slice(&bytes)?;
1512-
let mut changed = false;
1513-
for item in tracking_info.target_state_items.values_mut() {
1514-
if item.pending_process_token == Some(process_token) {
1515-
item.pending_process_token = None;
1516-
changed = true;
1517-
}
1518-
}
1519-
if !changed {
1515+
if tracking_info.pending_process_token != Some(process_token) {
15201516
return Ok(());
15211517
}
1518+
tracking_info.pending_process_token = None;
15221519
rmp_serde::to_vec_named(&tracking_info)?
15231520
};
15241521
app_store

rust/core/src/state/db_schema.rs

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -271,18 +271,6 @@ pub struct TargetStateInfoItem<'a> {
271271
/// It decides the generation of the provider.
272272
#[serde(rename = "G", default, skip_serializing_if = "Option::is_none")]
273273
pub provider_generation: Option<TargetStateProviderGeneration>,
274-
275-
/// Set by `pre_commit` when it queues a sink action against this item;
276-
/// cleared by `commit_in_txn`'s retention pass after the lifecycle commits.
277-
/// While set:
278-
/// - if the token equals the current process's startup token, another
279-
/// concurrent `pre_commit` in this process owns the item and observers
280-
/// must back off (see `PreCommitOutcome::PendingRetry`).
281-
/// - if the token does not match, the holding process crashed mid-
282-
/// lifecycle; observers take the dead-token recovery path (force
283-
/// `prev_may_be_missing = true`).
284-
#[serde(rename = "T", default, skip_serializing_if = "Option::is_none")]
285-
pub pending_process_token: Option<u128>,
286274
}
287275

288276
impl<'a> TargetStateInfoItem<'a> {
@@ -296,9 +284,24 @@ impl<'a> TargetStateInfoItem<'a> {
296284
.collect(),
297285
provider_schema_version: self.provider_schema_version,
298286
provider_generation: self.provider_generation,
299-
pending_process_token: self.pending_process_token,
300287
}
301288
}
289+
290+
/// True iff this item's `states` carries an unsettled push from a
291+
/// pre_commit that hasn't been finalized by `commit_in_txn`'s retention
292+
/// pass — either an in-flight modification by *this* process, a crashed
293+
/// prior process, or a rolled-back failed attempt. Used to force
294+
/// `prev_may_be_missing = true` on reconcile so the sink gets a fresh
295+
/// upsert/delete instead of a short-circuit "no change" decision.
296+
///
297+
/// Invariant: at rest (after a successful `commit_in_txn`), every item
298+
/// has `states.len() <= 1`. Retention always reduces the vec by dropping
299+
/// pre-curr_version entries and curr_version-Deleted entries. Multi-state
300+
/// only exists during the write→commit window or after a crash/rollback
301+
/// of a prior lifecycle.
302+
pub fn is_pending(&self) -> bool {
303+
self.states.len() > 1
304+
}
302305
}
303306

304307
/// Inverted tracking: maps a `TargetStatePath` to the component that owns it.
@@ -323,6 +326,16 @@ pub struct StablePathEntryTrackingInfo<'a> {
323326
pub target_state_items: BTreeMap<TargetStatePathWithProviderId, TargetStateInfoItem<'a>>,
324327
#[serde(rename = "N", borrow, default = "unknown_processor_name")]
325328
pub processor_name: Cow<'a, str>,
329+
/// Set by `pre_commit` when it queues at least one sink action against
330+
/// this component; cleared by `commit_in_txn` and by
331+
/// `rollback_pending_tokens` on failure. Distinguishes a live in-flight
332+
/// lifecycle in *this* process (token equals the process's startup token
333+
/// → preempting components must back off and retry) from one left by a
334+
/// crashed prior process (token is something else → observers proceed,
335+
/// using the per-item multi-state signal to force
336+
/// `prev_may_be_missing = true`). At-rest value is `None`.
337+
#[serde(rename = "T", default, skip_serializing_if = "Option::is_none")]
338+
pub pending_process_token: Option<u128>,
326339
}
327340

328341
impl<'a> StablePathEntryTrackingInfo<'a> {
@@ -331,6 +344,7 @@ impl<'a> StablePathEntryTrackingInfo<'a> {
331344
version: 0,
332345
target_state_items: BTreeMap::new(),
333346
processor_name,
347+
pending_process_token: None,
334348
}
335349
}
336350
}

0 commit comments

Comments
 (0)