Skip to content

Commit 3354945

Browse files
amankrxMarcusSorealheisclaude
authored
Fix Fast slow store Not Found error by returning failed precondition (#2194)
* Fix Fast slow store Not Found error by returning failed precondition * Add tests for CAS NotFound to FailedPrecondition conversion Tests the conditional that converts NotFound errors containing "not found in either fast or slow store" to FailedPrecondition, and verifies other NotFound errors still return InternalError. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> --------- Co-authored-by: Marcus <marcuseagan@gmail.com> Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent a7d873a commit 3354945

2 files changed

Lines changed: 237 additions & 5 deletions

File tree

nativelink-worker/src/local_worker.rs

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -369,11 +369,34 @@ impl<'a, T: WorkerApiClientTrait + 'static, U: RunningActionsManager> LocalWorke
369369
.err_tip(|| "Error while calling execution_response")?;
370370
},
371371
Err(e) => {
372-
grpc_client.execution_response(ExecuteResult{
373-
instance_name,
374-
operation_id,
375-
result: Some(execute_result::Result::InternalError(e.into())),
376-
}).await.err_tip(|| "Error calling execution_response with error")?;
372+
let is_cas_blob_missing = e.code == Code::NotFound
373+
&& e.message_string().contains("not found in either fast or slow store");
374+
if is_cas_blob_missing {
375+
warn!(
376+
?e,
377+
"Missing CAS inputs during prepare_action, returning FAILED_PRECONDITION"
378+
);
379+
let action_result = ActionResult {
380+
error: Some(make_err!(
381+
Code::FailedPrecondition,
382+
"{}",
383+
e.message_string()
384+
)),
385+
..ActionResult::default()
386+
};
387+
let action_stage = ActionStage::Completed(action_result);
388+
grpc_client.execution_response(ExecuteResult{
389+
instance_name,
390+
operation_id,
391+
result: Some(execute_result::Result::ExecuteResponse(action_stage.into())),
392+
}).await.err_tip(|| "Error calling execution_response with missing inputs")?;
393+
} else {
394+
grpc_client.execution_response(ExecuteResult{
395+
instance_name,
396+
operation_id,
397+
result: Some(execute_result::Result::InternalError(e.into())),
398+
}).await.err_tip(|| "Error calling execution_response with error")?;
399+
}
377400
},
378401
}
379402
Ok(())

nativelink-worker/tests/local_worker_test.rs

Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -752,6 +752,215 @@ async fn kill_action_request_kills_action() -> Result<(), Error> {
752752
Ok(())
753753
}
754754

755+
#[nativelink_test]
756+
async fn cas_not_found_returns_failed_precondition_test() -> Result<(), Error> {
757+
let mut test_context = setup_local_worker(HashMap::new()).await;
758+
let streaming_response = test_context.maybe_streaming_response.take().unwrap();
759+
760+
{
761+
let props = test_context
762+
.client
763+
.expect_connect_worker(Ok(streaming_response))
764+
.await;
765+
assert_eq!(props, ConnectWorkerRequest::default());
766+
}
767+
768+
let expected_worker_id = "foobar".to_string();
769+
770+
let tx_stream = test_context.maybe_tx_stream.take().unwrap();
771+
{
772+
tx_stream
773+
.send(Frame::data(
774+
encode_stream_proto(&UpdateForWorker {
775+
update: Some(Update::ConnectionResult(ConnectionResult {
776+
worker_id: expected_worker_id.clone(),
777+
})),
778+
})
779+
.unwrap(),
780+
))
781+
.await
782+
.map_err(|e| make_input_err!("Could not send : {:?}", e))?;
783+
}
784+
785+
let action_digest = DigestInfo::new([3u8; 32], 10);
786+
let action_info = ActionInfo {
787+
command_digest: DigestInfo::new([1u8; 32], 10),
788+
input_root_digest: DigestInfo::new([2u8; 32], 10),
789+
timeout: Duration::from_secs(1),
790+
platform_properties: HashMap::new(),
791+
priority: 0,
792+
load_timestamp: SystemTime::UNIX_EPOCH,
793+
insert_timestamp: SystemTime::UNIX_EPOCH,
794+
unique_qualifier: ActionUniqueQualifier::Uncacheable(ActionUniqueKey {
795+
instance_name: INSTANCE_NAME.to_string(),
796+
digest_function: DigestHasherFunc::Sha256,
797+
digest: action_digest,
798+
}),
799+
};
800+
801+
{
802+
tx_stream
803+
.send(Frame::data(
804+
encode_stream_proto(&UpdateForWorker {
805+
update: Some(Update::StartAction(StartExecute {
806+
execute_request: Some((&action_info).into()),
807+
operation_id: String::new(),
808+
queued_timestamp: None,
809+
platform: Some(Platform::default()),
810+
worker_id: expected_worker_id.clone(),
811+
})),
812+
})
813+
.unwrap(),
814+
))
815+
.await
816+
.map_err(|e| make_input_err!("Could not send : {:?}", e))?;
817+
}
818+
819+
let running_action = Arc::new(MockRunningAction::new());
820+
821+
// Send and wait for response from create_and_add_action.
822+
test_context
823+
.actions_manager
824+
.expect_create_and_add_action(Ok(running_action.clone()))
825+
.await;
826+
827+
// Simulate prepare_action failing with a CAS NotFound error containing the
828+
// specific "not found in either fast or slow store" message. This is the exact
829+
// condition that the code checks to decide whether to return FailedPrecondition.
830+
running_action
831+
.expect_prepare_action(Err(make_err!(
832+
Code::NotFound,
833+
"Hash 0123456789abcdef not found in either fast or slow store"
834+
)))
835+
.await?;
836+
837+
// Cleanup is still called even when prepare_action fails.
838+
running_action.cleanup(Ok(())).await?;
839+
840+
// The worker should respond with FailedPrecondition wrapped in an ExecuteResponse,
841+
// NOT an InternalError. This allows Bazel to re-upload the missing artifacts.
842+
let execution_response = test_context.client.expect_execution_response(Ok(())).await;
843+
844+
let expected_action_result = ActionResult {
845+
error: Some(make_err!(
846+
Code::FailedPrecondition,
847+
"Hash 0123456789abcdef not found in either fast or slow store"
848+
)),
849+
..ActionResult::default()
850+
};
851+
assert_eq!(
852+
execution_response,
853+
ExecuteResult {
854+
instance_name: INSTANCE_NAME.to_string(),
855+
operation_id: String::new(),
856+
result: Some(execute_result::Result::ExecuteResponse(
857+
ActionStage::Completed(expected_action_result).into()
858+
)),
859+
}
860+
);
861+
862+
Ok(())
863+
}
864+
865+
#[nativelink_test]
866+
async fn non_cas_not_found_returns_internal_error_test() -> Result<(), Error> {
867+
let mut test_context = setup_local_worker(HashMap::new()).await;
868+
let streaming_response = test_context.maybe_streaming_response.take().unwrap();
869+
870+
{
871+
let props = test_context
872+
.client
873+
.expect_connect_worker(Ok(streaming_response))
874+
.await;
875+
assert_eq!(props, ConnectWorkerRequest::default());
876+
}
877+
878+
let expected_worker_id = "foobar".to_string();
879+
880+
let tx_stream = test_context.maybe_tx_stream.take().unwrap();
881+
{
882+
tx_stream
883+
.send(Frame::data(
884+
encode_stream_proto(&UpdateForWorker {
885+
update: Some(Update::ConnectionResult(ConnectionResult {
886+
worker_id: expected_worker_id.clone(),
887+
})),
888+
})
889+
.unwrap(),
890+
))
891+
.await
892+
.map_err(|e| make_input_err!("Could not send : {:?}", e))?;
893+
}
894+
895+
let action_digest = DigestInfo::new([3u8; 32], 10);
896+
let action_info = ActionInfo {
897+
command_digest: DigestInfo::new([1u8; 32], 10),
898+
input_root_digest: DigestInfo::new([2u8; 32], 10),
899+
timeout: Duration::from_secs(1),
900+
platform_properties: HashMap::new(),
901+
priority: 0,
902+
load_timestamp: SystemTime::UNIX_EPOCH,
903+
insert_timestamp: SystemTime::UNIX_EPOCH,
904+
unique_qualifier: ActionUniqueQualifier::Uncacheable(ActionUniqueKey {
905+
instance_name: INSTANCE_NAME.to_string(),
906+
digest_function: DigestHasherFunc::Sha256,
907+
digest: action_digest,
908+
}),
909+
};
910+
911+
{
912+
tx_stream
913+
.send(Frame::data(
914+
encode_stream_proto(&UpdateForWorker {
915+
update: Some(Update::StartAction(StartExecute {
916+
execute_request: Some((&action_info).into()),
917+
operation_id: String::new(),
918+
queued_timestamp: None,
919+
platform: Some(Platform::default()),
920+
worker_id: expected_worker_id.clone(),
921+
})),
922+
})
923+
.unwrap(),
924+
))
925+
.await
926+
.map_err(|e| make_input_err!("Could not send : {:?}", e))?;
927+
}
928+
929+
let running_action = Arc::new(MockRunningAction::new());
930+
931+
test_context
932+
.actions_manager
933+
.expect_create_and_add_action(Ok(running_action.clone()))
934+
.await;
935+
936+
// Simulate prepare_action failing with a NotFound error that does NOT contain
937+
// the CAS-specific message. This should result in an InternalError, not
938+
// FailedPrecondition.
939+
let other_not_found_error = make_err!(Code::NotFound, "Some other resource was not found");
940+
running_action
941+
.expect_prepare_action(Err(other_not_found_error.clone()))
942+
.await?;
943+
944+
// Cleanup is still called even when prepare_action fails.
945+
running_action.cleanup(Ok(())).await?;
946+
947+
// The worker should respond with InternalError since this is not a CAS blob miss.
948+
let execution_response = test_context.client.expect_execution_response(Ok(())).await;
949+
950+
assert_eq!(
951+
execution_response,
952+
ExecuteResult {
953+
instance_name: INSTANCE_NAME.to_string(),
954+
operation_id: String::new(),
955+
result: Some(execute_result::Result::InternalError(
956+
other_not_found_error.into()
957+
)),
958+
}
959+
);
960+
961+
Ok(())
962+
}
963+
755964
#[cfg(target_family = "unix")]
756965
#[nativelink_test]
757966
async fn preconditions_met_extra_envs() -> Result<(), Error> {

0 commit comments

Comments
 (0)