Skip to content

Commit 9fcf5b1

Browse files
Sweep forgotten client operation IDs (#1965)
* Sweep forgotten client operation IDs * add helpful log
1 parent 854d51c commit 9fcf5b1

2 files changed

Lines changed: 120 additions & 0 deletions

File tree

nativelink-scheduler/src/store_awaited_action_db.rs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -614,6 +614,46 @@ where
614614
let Some(operation_id) = maybe_operation_id else {
615615
return Ok(None);
616616
};
617+
618+
// Validate that the internal operation actually exists.
619+
// If it doesn't, this is an orphaned client operation mapping that should be cleaned up.
620+
// This can happen when an operation is deleted (completed/timed out) but the
621+
// client_id -> operation_id mapping persists in the store.
622+
let maybe_awaited_action = match self
623+
.store
624+
.get_and_decode(OperationIdToAwaitedAction(Cow::Borrowed(&operation_id)))
625+
.await
626+
{
627+
Ok(maybe_action) => maybe_action,
628+
Err(err) if err.code == Code::NotFound => {
629+
tracing::warn!(
630+
"Orphaned client operation mapping detected: client_id={} maps to operation_id={}, \
631+
but the operation does not exist in the store (NotFound). This typically happens when \
632+
an operation completes or times out but the client mapping persists.",
633+
client_operation_id,
634+
operation_id
635+
);
636+
None
637+
}
638+
Err(err) => {
639+
// Some other error occurred
640+
return Err(err).err_tip(
641+
|| "In RedisAwaitedActionDb::get_awaited_action_by_id::validate_operation",
642+
);
643+
}
644+
};
645+
646+
if maybe_awaited_action.is_none() {
647+
tracing::warn!(
648+
"Found orphaned client operation mapping: client_id={} -> operation_id={}, \
649+
but operation no longer exists. Returning None to prevent client from polling \
650+
a non-existent operation.",
651+
client_operation_id,
652+
operation_id
653+
);
654+
return Ok(None);
655+
}
656+
617657
Ok(Some(OperationSubscriber::new(
618658
Some(client_operation_id.clone()),
619659
OperationIdToAwaitedAction(Cow::Owned(operation_id)),

nativelink-scheduler/tests/redis_store_awaited_action_db_test.rs

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -688,6 +688,25 @@ async fn add_action_smoke_test() -> Result<(), Error> {
688688
])),
689689
None,
690690
)
691+
// Validation HMGET: Check if the internal operation exists (orphan detection)
692+
.expect(
693+
MockCommand {
694+
cmd: Str::from_static("HMGET"),
695+
subcommand: None,
696+
args: vec![
697+
format!("aa_{WORKER_OPERATION_ID}").as_bytes().into(),
698+
"version".as_bytes().into(),
699+
"data".as_bytes().into(),
700+
],
701+
},
702+
Ok(RedisValue::Array(vec![
703+
// Version.
704+
"1".into(),
705+
// Data.
706+
RedisValue::Bytes(Bytes::from(serde_json::to_string(&worker_awaited_action).unwrap())),
707+
])),
708+
None,
709+
)
691710
.expect(
692711
MockCommand {
693712
cmd: Str::from_static("HMGET"),
@@ -1029,3 +1048,64 @@ async fn test_outdated_version() -> Result<(), Error> {
10291048

10301049
Ok(())
10311050
}
1051+
1052+
/// Test that orphaned client operation ID mappings return None.
1053+
///
1054+
/// This tests the scenario where:
1055+
/// 1. A client operation ID mapping exists (cid_* → operation_id)
1056+
/// 2. The actual operation (aa_*) has been deleted (completed/timed out)
1057+
/// 3. get_awaited_action_by_id should return None instead of a subscriber to a non-existent operation
1058+
#[nativelink_test]
1059+
async fn test_orphaned_client_operation_id_returns_none() -> Result<(), Error> {
1060+
const CLIENT_OPERATION_ID: &str = "orphaned_client_id";
1061+
const INTERNAL_OPERATION_ID: &str = "deleted_internal_operation_id";
1062+
const SUB_CHANNEL: &str = "sub_channel";
1063+
1064+
let worker_operation_id = Arc::new(Mutex::new(INTERNAL_OPERATION_ID));
1065+
let worker_operation_id_clone = worker_operation_id.clone();
1066+
1067+
let internal_operation_id = OperationId::from(INTERNAL_OPERATION_ID);
1068+
1069+
// Use FakeRedisBackend which handles SUBSCRIBE automatically
1070+
let mocks = Arc::new(FakeRedisBackend::new());
1071+
let store = make_redis_store(SUB_CHANNEL, mocks.clone());
1072+
mocks.set_subscription_manager(store.subscription_manager().unwrap());
1073+
1074+
// Manually set up the orphaned state in the fake backend:
1075+
// 1. Add client_id → operation_id mapping (cid_* key)
1076+
{
1077+
let mut table = mocks.table.lock();
1078+
let mut client_fields = HashMap::new();
1079+
client_fields.insert(
1080+
"data".into(),
1081+
RedisValue::Bytes(Bytes::from(
1082+
serde_json::to_string(&internal_operation_id).unwrap(),
1083+
)),
1084+
);
1085+
table.insert(format!("cid_{CLIENT_OPERATION_ID}"), client_fields);
1086+
}
1087+
// 2. Don't add the actual operation (aa_* key) - this simulates it being deleted/orphaned
1088+
1089+
let notifier = Arc::new(Notify::new());
1090+
let awaited_action_db = StoreAwaitedActionDb::new(
1091+
store.clone(),
1092+
notifier.clone(),
1093+
MockInstantWrapped::default,
1094+
move || worker_operation_id_clone.lock().clone().into(),
1095+
)
1096+
.unwrap();
1097+
1098+
// Try to get the awaited action by the client operation ID
1099+
// This should return None because the internal operation doesn't exist (orphaned mapping)
1100+
let result = awaited_action_db
1101+
.get_awaited_action_by_id(&OperationId::from(CLIENT_OPERATION_ID))
1102+
.await
1103+
.expect("Should not error when checking orphaned client operation");
1104+
1105+
assert!(
1106+
result.is_none(),
1107+
"Expected None for orphaned client operation ID, but got a subscription"
1108+
);
1109+
1110+
Ok(())
1111+
}

0 commit comments

Comments
 (0)