diff --git a/app/buck2_action_impl/src/actions/impls/run.rs b/app/buck2_action_impl/src/actions/impls/run.rs index 25d6be6082234..b13ed22d4f6cf 100644 --- a/app/buck2_action_impl/src/actions/impls/run.rs +++ b/app/buck2_action_impl/src/actions/impls/run.rs @@ -1076,9 +1076,14 @@ impl RunAction { // First, check in the local dep file cache if an identical action can be found there. // Do this before checking the action cache as we can avoid a potentially large download. // Once the action cache lookup misses, we will do the full dep file cache look up. - let (outputs, should_fully_check_dep_file_cache) = dep_file_bundle - .check_local_dep_file_cache_for_identical_action(ctx, self.outputs.as_slice()) - .await?; + let should_bypass_action_cache = ctx.should_bypass_action_cache(); + let (outputs, should_fully_check_dep_file_cache) = if should_bypass_action_cache { + (None, false) + } else { + dep_file_bundle + .check_local_dep_file_cache_for_identical_action(ctx, self.outputs.as_slice()) + .await? + }; if let Some((outputs, metadata)) = outputs { return Ok(ExecuteResult::LocalDepFileHit(outputs, metadata)); } @@ -1541,7 +1546,8 @@ impl Action for RunAction { waiting_data: WaitingData, ) -> Result<(ActionOutputs, ActionExecutionMetadata), ExecuteError> { // Check offline cache first if parameter enabled - if self.inner.allow_offline_output_cache + if !ctx.should_bypass_action_cache() + && self.inner.allow_offline_output_cache && ctx.run_action_knobs().use_network_action_output_cache { if let Some((outputs, metadata)) = self.execute_for_offline(ctx).await? { diff --git a/app/buck2_build_api/src/actions.rs b/app/buck2_build_api/src/actions.rs index 4e32cae6f6861..fc7f02c01f080 100644 --- a/app/buck2_build_api/src/actions.rs +++ b/app/buck2_build_api/src/actions.rs @@ -97,6 +97,7 @@ pub mod execute; pub mod impls; pub mod query; pub mod registry; +pub mod rewind; /// Represents an unregistered 'Action' that will be registered into the 'Actions' module. /// The 'UnregisteredAction' is not executable until it is registered, upon which it becomes an @@ -286,6 +287,10 @@ pub trait ActionExecutionCtx: Send + Sync { prepared_action: &PreparedAction, ) -> ControlFlow; + fn should_bypass_action_cache(&self) -> bool { + false + } + async fn cache_upload( &mut self, action: &ActionDigestAndBlobs, diff --git a/app/buck2_build_api/src/actions/calculation.rs b/app/buck2_build_api/src/actions/calculation.rs index 72214bf1a525a..bfba55691af45 100644 --- a/app/buck2_build_api/src/actions/calculation.rs +++ b/app/buck2_build_api/src/actions/calculation.rs @@ -21,6 +21,7 @@ use buck2_artifact::artifact::build_artifact::BuildArtifact; use buck2_build_signals::env::NodeDuration; use buck2_build_signals::env::WaitingData; use buck2_common::events::HasEvents; +use buck2_common::file_ops::metadata::FileDigest; use buck2_core::deferred::base_deferred_key::BaseDeferredKey; use buck2_core::fs::artifact_path_resolver::ArtifactFs; use buck2_core::fs::project_rel_path::ProjectRelativePathBuf; @@ -29,15 +30,24 @@ use buck2_data::ActionErrorDiagnostics; use buck2_data::ActionSubErrors; use buck2_data::ToProtoMessage; use buck2_data::get_action_digest; +use buck2_directory::directory::directory::Directory; +use buck2_directory::directory::directory_iterator::DirectoryIterator; +use buck2_directory::directory::entry::DirectoryEntry; use buck2_error::BuckErrorContext; +use buck2_error::ErrorTag; use buck2_event_observer::action_util::get_execution_time_ms; use buck2_events::dispatch::async_record_root_spans; use buck2_events::dispatch::get_dispatcher; use buck2_events::dispatch::span_async; use buck2_events::span::SpanId; use buck2_execute::artifact::artifact_dyn::ArtifactDyn; +use buck2_execute::artifact_value::ArtifactValue; +use buck2_execute::directory::ActionDirectoryMember; +use buck2_execute::directory::ActionSharedDirectory; use buck2_execute::execute::result::CommandExecutionReport; use buck2_execute::execute::result::CommandExecutionStatus; +use buck2_execute::materialize::materializer::ReLostInput; +use buck2_execute::materialize::materializer::ReLostInputs; use buck2_execute::output_size::OutputSize; use buck2_hash::BuckIndexMap; use buck2_interpreter::print_handler::EventDispatcherPrintHandler; @@ -62,6 +72,7 @@ use smallvec::SmallVec; use starlark::environment::Module; use starlark::eval::Evaluator; use tracing::debug; +use tracing::info; use crate::actions::RegisteredAction; use crate::actions::artifact::get_artifact_fs::GetArtifactFs; @@ -69,10 +80,13 @@ use crate::actions::error::ActionError; use crate::actions::error_handler::ActionErrorHandlerError; use crate::actions::error_handler::ActionSubErrorResult; use crate::actions::error_handler::StarlarkActionErrorContext; +use crate::actions::execute::action_executor::ActionExecutionMetadata; use crate::actions::execute::action_executor::ActionOutputs; use crate::actions::execute::action_executor::BuckActionExecutor; use crate::actions::execute::action_executor::HasActionExecutor; use crate::actions::execute::error::ExecuteError; +use crate::actions::rewind::ActionRewindRequest; +use crate::actions::rewind::HasActionRewindTracker; use crate::artifact_groups::ArtifactGroup; use crate::artifact_groups::ArtifactGroupValues; use crate::artifact_groups::calculation::ensure_artifact_group_staged; @@ -110,7 +124,15 @@ async fn build_action_impl( build_action_no_redirect(ctx, cancellation, action).await } -async fn build_action_no_redirect( +fn build_action_no_redirect<'a>( + ctx: &'a mut DiceComputations<'_>, + cancellation: &'a CancellationContext, + action: Arc, +) -> BoxFuture<'a, buck2_error::Result> { + build_action_no_redirect_impl(ctx, cancellation, action).boxed() +} + +async fn build_action_no_redirect_impl( ctx: &mut DiceComputations<'_>, cancellation: &CancellationContext, action: Arc, @@ -310,9 +332,17 @@ async fn build_action_inner( } None => buck2_data::ExpectedEligibleForDedupe::UnknownEligibility, }; - let (execute_result, command_reports) = executor - .execute(waiting_data, ensured_inputs, action, cancellation) - .await; + let skip_action_cache = ctx.per_transaction_data().is_action_rewound(action.key()); + let (execute_result, command_reports) = execute_with_lost_input_rewind_request( + ctx, + cancellation, + executor, + waiting_data, + ensured_inputs, + action, + skip_action_cache, + ) + .await; let allow_omit_details = execute_result.is_ok(); @@ -570,6 +600,270 @@ fn is_action_eligible_for_dedupe( buck2_data::EligibleForDedupe::Eligible } +struct LostCasInput { + digest: FileDigest, + path: ProjectRelativePathBuf, +} + +async fn execute_with_lost_input_rewind_request( + _ctx: &mut DiceComputations<'_>, + cancellation: &CancellationContext, + executor: &BuckActionExecutor, + waiting_data: WaitingData, + ensured_inputs: BuckIndexMap, + action: &Arc, + skip_action_cache: bool, +) -> ( + Result<(ActionOutputs, ActionExecutionMetadata), ExecuteError>, + Vec, +) { + let (execute_result, command_reports) = executor + .execute( + waiting_data, + ensured_inputs.clone(), + action, + cancellation, + skip_action_cache, + ) + .await; + + let execute_result = match execute_result { + Ok(result) => Ok(result), + Err(error) => Err(add_lost_input_rewind_request( + action.key(), + &ensured_inputs, + error, + )), + }; + + (execute_result, command_reports) +} + +fn add_lost_input_rewind_request( + consumer_key: &ActionKey, + ensured_inputs: &BuckIndexMap, + error: ExecuteError, +) -> ExecuteError { + let lost_inputs = lost_cas_inputs(&error); + let request = if !lost_inputs.is_empty() { + let producer_keys = lost_input_producers(consumer_key, ensured_inputs, &lost_inputs); + if producer_keys.is_empty() { + return error; + } + + rewind_request(producer_keys, consumer_key) + } else if is_missing_input_re_error(&error) { + let producer_keys = input_producers(consumer_key, ensured_inputs); + if producer_keys.is_empty() { + return error; + } + + for producer_key in &producer_keys { + info!( + consumer_action = %consumer_key, + producer_action = %producer_key, + "Requesting DICE graph rewind after remote execution reported a missing input" + ); + } + + rewind_request(producer_keys, consumer_key) + } else if is_missing_input_materialization_error(&error) { + let producer_keys = input_producers(consumer_key, ensured_inputs); + if producer_keys.is_empty() { + return error; + } + + for producer_key in &producer_keys { + info!( + consumer_action = %consumer_key, + producer_action = %producer_key, + "Requesting DICE graph rewind after local input materialization reported a missing input" + ); + } + + rewind_request(producer_keys, consumer_key) + } else { + return error; + }; + + match error { + ExecuteError::Error { error } => ExecuteError::Error { + error: error.context(request), + }, + ExecuteError::CommandExecutionError { + error: Some(error), .. + } => ExecuteError::Error { + error: error.context(request), + }, + error => error, + } +} + +fn rewind_request( + mut producer_keys: Vec, + consumer_key: &ActionKey, +) -> ActionRewindRequest { + producer_keys.push(consumer_key.dupe()); + ActionRewindRequest::new(producer_keys) +} + +fn lost_cas_inputs(error: &ExecuteError) -> Vec { + let Some(error) = execute_error(error) else { + return Vec::new(); + }; + + if let Some(lost_inputs) = error.find_typed_context::() { + return lost_inputs + .inputs() + .iter() + .map(|lost_input| LostCasInput { + digest: lost_input.digest.clone(), + path: lost_input.path.clone(), + }) + .collect(); + } + + error + .find_typed_context::() + .map(|lost_input| { + vec![LostCasInput { + digest: lost_input.digest.clone(), + path: lost_input.path.clone(), + }] + }) + .unwrap_or_default() +} + +fn lost_input_producers( + consumer_key: &ActionKey, + ensured_inputs: &BuckIndexMap, + lost_inputs: &[LostCasInput], +) -> Vec { + let mut producer_keys = Vec::new(); + + for lost_input in lost_inputs { + let Some(producer_key) = + lost_input_producer(consumer_key, ensured_inputs, &lost_input.digest) + else { + continue; + }; + + info!( + consumer_action = %consumer_key, + producer_action = %producer_key, + lost_input_path = %lost_input.path, + lost_input_digest = %lost_input.digest, + "Requesting DICE graph rewind after a generated input disappeared from remote CAS" + ); + + if producer_keys.iter().any(|key| key == &producer_key) { + continue; + } + producer_keys.push(producer_key); + } + + producer_keys +} + +fn lost_input_producer( + consumer_key: &ActionKey, + ensured_inputs: &BuckIndexMap, + lost_digest: &FileDigest, +) -> Option { + for values in ensured_inputs.values() { + for (artifact, value) in values.iter() { + if !artifact_value_contains_digest(value, lost_digest) { + continue; + } + let Some(action_key) = artifact.action_key() else { + continue; + }; + if action_key == consumer_key { + continue; + } + return Some(action_key.dupe()); + } + } + + None +} + +fn input_producers( + consumer_key: &ActionKey, + ensured_inputs: &BuckIndexMap, +) -> Vec { + let mut producer_keys = Vec::new(); + + for values in ensured_inputs.values() { + for (artifact, _) in values.iter() { + let Some(action_key) = artifact.action_key() else { + continue; + }; + if action_key == consumer_key { + continue; + } + if producer_keys.iter().any(|existing| existing == action_key) { + continue; + } + producer_keys.push(action_key.dupe()); + } + } + + producer_keys +} + +fn artifact_value_contains_digest(value: &ArtifactValue, lost_digest: &FileDigest) -> bool { + match value.entry() { + DirectoryEntry::Dir(directory) => directory_contains_digest(directory, lost_digest), + DirectoryEntry::Leaf(ActionDirectoryMember::File(file)) => { + file.digest.data() == lost_digest + } + DirectoryEntry::Leaf(..) => false, + } +} + +fn directory_contains_digest(directory: &ActionSharedDirectory, lost_digest: &FileDigest) -> bool { + directory + .unordered_walk_leaves() + .without_paths() + .any(|entry| match entry { + ActionDirectoryMember::File(file) => file.digest.data() == lost_digest, + ActionDirectoryMember::Symlink(_) | ActionDirectoryMember::ExternalSymlink(_) => false, + }) +} + +fn is_missing_input_re_error(error: &ExecuteError) -> bool { + let Some(error) = execute_error(error) else { + return false; + }; + if !error.tags().contains(&ErrorTag::ReFailedPrecondition) { + return false; + } + + let message = format!("{error:#}").to_ascii_lowercase(); + message.contains("missing") + && (message.contains("input") || message.contains("blob") || message.contains("cas")) +} + +fn is_missing_input_materialization_error(error: &ExecuteError) -> bool { + let Some(error) = execute_error(error) else { + return false; + }; + + error.tags().contains(&ErrorTag::MaterializationError) + && error.tags().contains(&ErrorTag::ReNotFound) +} + +fn execute_error(error: &ExecuteError) -> Option<&buck2_error::Error> { + match error { + ExecuteError::Error { error } => Some(error), + ExecuteError::CommandExecutionError { + error: Some(error), .. + } => Some(error), + _ => None, + } +} + fn check_infra_error_patterns( last_command: Option<&buck2_data::CommandExecution>, ) -> Option { diff --git a/app/buck2_build_api/src/actions/execute/action_executor.rs b/app/buck2_build_api/src/actions/execute/action_executor.rs index 70a61d4461a11..984ce8215d559 100644 --- a/app/buck2_build_api/src/actions/execute/action_executor.rs +++ b/app/buck2_build_api/src/actions/execute/action_executor.rs @@ -393,6 +393,7 @@ struct BuckActionExecutionContext<'a> { outputs: &'a [BuildArtifact], command_reports: &'a mut Vec, cancellations: &'a CancellationContext, + skip_action_cache: bool, } #[async_trait] @@ -497,6 +498,10 @@ impl ActionExecutionCtx for BuckActionExecutionContext<'_> { request: &CommandExecutionRequest, prepared_action: &PreparedAction, ) -> ControlFlow { + if self.skip_action_cache { + return ControlFlow::Continue(manager); + } + let action = self.target(); self.executor .command_executor @@ -507,6 +512,7 @@ impl ActionExecutionCtx for BuckActionExecutionContext<'_> { request, prepared_action, digest_config: self.digest_config(), + force_skip_cache_read: false, }, self.cancellations, ) @@ -519,6 +525,10 @@ impl ActionExecutionCtx for BuckActionExecutionContext<'_> { request: &CommandExecutionRequest, prepared_action: &PreparedAction, ) -> ControlFlow { + if self.skip_action_cache { + return ControlFlow::Continue(manager); + } + let action = self.target(); self.executor .command_executor @@ -529,6 +539,7 @@ impl ActionExecutionCtx for BuckActionExecutionContext<'_> { request, prepared_action, digest_config: self.digest_config(), + force_skip_cache_read: false, }, self.cancellations, ) @@ -623,12 +634,17 @@ impl ActionExecutionCtx for BuckActionExecutionContext<'_> { request, prepared_action, digest_config: self.digest_config(), + force_skip_cache_read: self.skip_action_cache, }, self.cancellations, ) .await } + fn should_bypass_action_cache(&self) -> bool { + self.skip_action_cache + } + async fn cache_upload( &mut self, action_digest_and_blobs: &ActionDigestAndBlobs, @@ -713,6 +729,7 @@ impl BuckActionExecutor { inputs: BuckIndexMap, action: &RegisteredAction, cancellations: &CancellationContext, + skip_action_cache: bool, ) -> ( Result<(ActionOutputs, ActionExecutionMetadata), ExecuteError>, Vec, @@ -729,6 +746,7 @@ impl BuckActionExecutor { outputs: outputs.as_ref(), command_reports: &mut command_reports, cancellations, + skip_action_cache, }; let (result, metadata) = action.execute(&mut ctx, waiting_data).await?; @@ -1101,6 +1119,7 @@ mod tests { Default::default(), &action, CancellationContext::testing(), + false, ), ) .await diff --git a/app/buck2_build_api/src/actions/rewind.rs b/app/buck2_build_api/src/actions/rewind.rs new file mode 100644 index 0000000000000..08ddb721fd3cc --- /dev/null +++ b/app/buck2_build_api/src/actions/rewind.rs @@ -0,0 +1,119 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is dual-licensed under either the MIT license found in the + * LICENSE-MIT file in the root directory of this source tree or the Apache + * License, Version 2.0 found in the LICENSE-APACHE file in the root directory + * of this source tree. You may select, at your option, one of the + * above-listed licenses. + */ + +use std::any::Any; + +use allocative::Allocative; +use buck2_artifact::actions::key::ActionKey; +use buck2_error::TypedContext; +use buck2_hash::BuckDashSet; +use dice::UserComputationData; +use dupe::Dupe; + +#[derive(Allocative, Clone, Debug, Eq, PartialEq)] +pub struct ActionRewindRequest { + action_keys: Vec, +} + +impl ActionRewindRequest { + pub fn new(action_keys: Vec) -> Self { + Self { action_keys } + } + + pub fn action_keys(&self) -> &[ActionKey] { + &self.action_keys + } + + pub fn into_action_keys(self) -> Vec { + self.action_keys + } + + pub fn merge(&mut self, other: &Self) { + for key in &other.action_keys { + if !self.action_keys.iter().any(|existing| existing == key) { + self.action_keys.push(key.dupe()); + } + } + } +} + +impl TypedContext for ActionRewindRequest { + fn eq(&self, other: &dyn TypedContext) -> bool { + (other as &dyn Any).downcast_ref::() == Some(self) + } + + fn display(&self) -> Option { + Some(format!("ActionRewindRequest({:?})", self.action_keys)) + } +} + +pub struct ActionRewindTrackerHolder(BuckDashSet); + +pub trait HasActionRewindTracker { + fn init_action_rewind_tracker(&mut self); + + fn set_rewound_actions(&self, action_keys: &[ActionKey]); + + fn is_action_rewound(&self, action_key: &ActionKey) -> bool; +} + +impl HasActionRewindTracker for UserComputationData { + fn init_action_rewind_tracker(&mut self) { + self.data + .set(ActionRewindTrackerHolder(BuckDashSet::default())); + } + + fn set_rewound_actions(&self, action_keys: &[ActionKey]) { + let holder = self + .data + .get::() + .expect("ActionRewindTracker should be set"); + for action_key in action_keys { + holder.0.insert(action_key.dupe()); + } + } + + fn is_action_rewound(&self, action_key: &ActionKey) -> bool { + self.data + .get::() + .is_ok_and(|holder| holder.0.contains(action_key)) + } +} + +#[cfg(test)] +mod tests { + use buck2_artifact::actions::key::ActionIndex; + use buck2_core::deferred::key::DeferredHolderKey; + use dice::UserComputationData; + + use super::*; + + fn action_key(id: u32) -> ActionKey { + ActionKey::new( + DeferredHolderKey::testing_new("cell//pkg:target"), + ActionIndex::new(id), + ) + } + + #[test] + fn rewound_actions_accumulate_across_rewinds() { + let mut data = UserComputationData::new(); + data.init_action_rewind_tracker(); + + let first = action_key(0); + let second = action_key(1); + + data.set_rewound_actions(&[first.clone()]); + data.set_rewound_actions(&[second.clone()]); + + assert!(data.is_action_rewound(&first)); + assert!(data.is_action_rewound(&second)); + } +} diff --git a/app/buck2_build_api/src/materialize.rs b/app/buck2_build_api/src/materialize.rs index f28c38930df0d..ecf7d4e306486 100644 --- a/app/buck2_build_api/src/materialize.rs +++ b/app/buck2_build_api/src/materialize.rs @@ -22,6 +22,7 @@ use buck2_common::legacy_configs::view::LegacyBuckConfigView; use buck2_core::execution_types::executor_config::RemoteExecutorUseCase; use buck2_core::fs::project_rel_path::ProjectRelativePath; use buck2_error::BuckErrorContext; +use buck2_error::ErrorTag; use buck2_execute::artifact::artifact_dyn::ArtifactDyn; use buck2_execute::artifact_utils::ArtifactValueBuilder; use buck2_execute::artifact_value::ArtifactValue; @@ -40,6 +41,7 @@ use crate::actions::artifact::get_artifact_fs::GetArtifactFs; use crate::actions::artifact::materializer::ArtifactMaterializer; use crate::actions::execute::dice_data::GetReClient; use crate::actions::impls::run_action_knobs::HasRunActionKnobs; +use crate::actions::rewind::ActionRewindRequest; use crate::artifact_groups::ArtifactGroup; use crate::artifact_groups::ArtifactGroupValues; use crate::artifact_groups::calculation::ArtifactGroupCalculation; @@ -64,7 +66,9 @@ pub async fn materialize_and_upload_artifact_group( async move { match contexts.1 { UploadContext::Skip => Ok(()), - UploadContext::Upload => ensure_uploaded(ctx, group).await, + UploadContext::Upload => { + ensure_uploaded(ctx, group, contexts.0, queue_tracker).await + } } } .boxed() @@ -99,80 +103,132 @@ async fn materialize_artifact_group( ctx.per_transaction_data().get_materializer(), )); - let mut materialize_futs = Vec::new(); + { + let mut materialize_futs = Vec::new(); - for (artifact, value) in values.iter() { - if let BaseArtifactKind::Build(artifact) = artifact.as_parts().0 { - if !queue_tracker.insert(artifact.dupe()) { - // We've already requested this artifact, no use requesting it again. - continue; - } + for (artifact, value) in values.iter() { + if let BaseArtifactKind::Build(artifact) = artifact.as_parts().0 { + if !queue_tracker.insert(artifact.dupe()) { + // We've already requested this artifact, no use requesting it again. + continue; + } - let fut = { - let waiting_data = waiting_data.clone(); - let artifact = artifact.dupe(); - let value = value.dupe(); - let shared_data = shared_data.dupe(); - let artifact_group = artifact_group.dupe(); + let fut = { + let waiting_data = waiting_data.clone(); + let artifact = artifact.dupe(); + let value = value.dupe(); + let shared_data = shared_data.dupe(); + let artifact_group = artifact_group.dupe(); - async move { - let (data, artifact_fs, materializer) = &*shared_data; + async move { + let (data, artifact_fs, materializer) = &*shared_data; - let configuration_hash_path = artifact_fs - .resolve_build_configuration_hash_path(artifact.get_path())?; + let configuration_hash_path = artifact_fs + .resolve_build_configuration_hash_path(artifact.get_path())?; - if artifact.get_path().is_content_based_path() { - let content_based_path = artifact_fs.resolve_build( - artifact.get_path(), - Some(&value.content_based_path_hash()), - )?; - let mut builder = - ArtifactValueBuilder::new(artifact_fs.fs(), digest_config); - builder.add_symlinked( - // The materializer doesn't care about the `src_value`. - &ArtifactValue::dir(digest_config.empty_directory()), - content_based_path, - &configuration_hash_path, - )?; - let symlink_value = builder.build(&configuration_hash_path)?; + if artifact.get_path().is_content_based_path() { + let content_based_path = artifact_fs.resolve_build( + artifact.get_path(), + Some(&value.content_based_path_hash()), + )?; + let mut builder = + ArtifactValueBuilder::new(artifact_fs.fs(), digest_config); + builder.add_symlinked( + // The materializer doesn't care about the `src_value`. + &ArtifactValue::dir(digest_config.empty_directory()), + content_based_path, + &configuration_hash_path, + )?; + let symlink_value = builder.build(&configuration_hash_path)?; - materializer + materializer .declare_copy(configuration_hash_path.clone(), symlink_value, Vec::new(), None) .await .buck_error_context( "Failed to declare configuration path to content-based path symlinks", )?; + } + + data.try_materialize_requested_artifact( + &artifact, + waiting_data, + force, + configuration_hash_path, + &artifact_group, + ) + .await + .buck_error_context("Failed to materialize artifacts")?; + buck2_error::Ok(()) } + }; + materialize_futs.push(spawn_dropcancel( + move |_cancellations| fut.boxed(), + &*data.per_transaction_data().spawner, + data.per_transaction_data(), + )); + } + } - data.try_materialize_requested_artifact( - &artifact, - waiting_data, - force, - configuration_hash_path, - &artifact_group, - ) - .await - .buck_error_context("Failed to materialize artifacts")?; - buck2_error::Ok(()) - } - }; - materialize_futs.push(spawn_dropcancel( - move |_cancellations| fut.boxed(), - &*data.per_transaction_data().spawner, - data.per_transaction_data(), - )); + match buck2_util::future::try_join_all(materialize_futs).await { + Ok(_) => {} + Err(error) if error.tags().contains(&ErrorTag::ReNotFound) => { + remove_materialization_queue_entries(&values, queue_tracker); + return Err(match artifact_group_values_rewind_request(&values) { + Some(request) => error.context(request), + None => error, + }); + } + Err(error) => return Err(error), } } - - buck2_util::future::try_join_all(materialize_futs).await?; } Ok(values) } +fn artifact_group_values_rewind_request( + values: &ArtifactGroupValues, +) -> Option { + let mut producers = Vec::new(); + + for (artifact, _) in values.iter() { + if let BaseArtifactKind::Build(artifact) = artifact.as_parts().0 { + if !producers.iter().any(|key| key == artifact.key()) { + producers.push(artifact.key().dupe()); + } + } + } + + if producers.is_empty() { + return None; + } + + for producer in &producers { + tracing::info!( + producer_action = %producer, + "Requesting DICE graph rewind after a requested output disappeared from remote CAS" + ); + } + + Some(ActionRewindRequest::new(producers)) +} + +fn remove_materialization_queue_entries( + values: &ArtifactGroupValues, + queue_tracker: &Arc>, +) { + for (artifact, _) in values.iter() { + if let BaseArtifactKind::Build(artifact) = artifact.as_parts().0 { + queue_tracker.remove(artifact); + } + } +} + async fn ensure_uploaded( ctx: &mut DiceComputations<'_>, artifact_group: &ArtifactGroup, + materialization_context: MaterializationContext, + queue_tracker: &Arc>, ) -> buck2_error::Result<()> { let digest_config = ctx.global_data().get_digest_config(); let artifact_fs = ctx.get_artifact_fs().await?; @@ -205,7 +261,8 @@ async fn ensure_uploaded( .map_or_else(RemoteExecutorUseCase::buck2_default, |v| { RemoteExecutorUseCase::new((*v).to_owned()) }); - ctx.per_transaction_data() + match ctx + .per_transaction_data() .get_re_client() .with_use_case(re_use_case) .upload( @@ -220,9 +277,26 @@ async fn ensure_uploaded( .get_run_action_knobs() .deduplicate_get_digests_ttl_calls, ) - .await?; - - Ok(()) + .await + { + Ok(_) => Ok(()), + Err(error) if error.tags().contains(&ErrorTag::ReNotFound) => { + // Upload and materialization run through try_compute2. If upload + // requests a rewind first, materialization can be dropped before + // it removes the artifacts it already queued. + if matches!( + materialization_context, + MaterializationContext::Materialize { .. } + ) { + remove_materialization_queue_entries(&values, queue_tracker); + } + Err(match artifact_group_values_rewind_request(&values) { + Some(request) => error.context(request), + None => error, + }) + } + Err(error) => Err(error), + } } #[derive(Clone, Dupe, Copy)] @@ -285,6 +359,8 @@ pub struct MaterializationQueueTrackerHolder(Arc>); pub trait HasMaterializationQueueTracker { fn init_materialization_queue_tracker(&mut self); + fn clear_materialization_queue_tracker(&self); + fn get_materialization_queue_tracker(&self) -> Arc>; } @@ -295,6 +371,14 @@ impl HasMaterializationQueueTracker for UserComputationData { ))); } + fn clear_materialization_queue_tracker(&self) { + self.data + .get::() + .expect("MaterializationQueueTracker should be set") + .0 + .clear(); + } + fn get_materialization_queue_tracker(&self) -> Arc> { self.data .get::() @@ -303,3 +387,49 @@ impl HasMaterializationQueueTracker for UserComputationData { .dupe() } } + +#[cfg(test)] +mod tests { + use buck2_artifact::actions::key::ActionIndex; + use buck2_artifact::artifact::artifact_type::Artifact; + use buck2_artifact::artifact::artifact_type::testing::BuildArtifactTestingExt; + use buck2_core::configuration::data::ConfigurationData; + use buck2_core::target::configured_target_label::ConfiguredTargetLabel; + use buck2_execute::digest_config::DigestConfig; + + use super::*; + + fn build_artifact(path: &str) -> BuildArtifact { + let target = + ConfiguredTargetLabel::testing_parse("cell//pkg:foo", ConfigurationData::testing_new()); + BuildArtifact::testing_new(target, path, ActionIndex::new(0)) + } + + #[test] + fn remove_materialization_queue_entries_clears_build_artifacts() { + let artifact = build_artifact("out"); + let values = ArtifactGroupValues::from_artifact( + Artifact::from(artifact.dupe()), + ArtifactValue::file(DigestConfig::testing_default().empty_file()), + ); + let queue_tracker = Arc::new(BuckDashSet::default()); + queue_tracker.insert(artifact.dupe()); + + remove_materialization_queue_entries(&values, &queue_tracker); + + assert!(!queue_tracker.contains(&artifact)); + } + + #[test] + fn clear_materialization_queue_tracker_drops_all_entries() { + let artifact = build_artifact("out"); + let mut data = UserComputationData::new(); + data.init_materialization_queue_tracker(); + let queue_tracker = data.get_materialization_queue_tracker(); + queue_tracker.insert(artifact.dupe()); + + data.clear_materialization_queue_tracker(); + + assert!(!queue_tracker.contains(&artifact)); + } +} diff --git a/app/buck2_common/src/pattern/resolve.rs b/app/buck2_common/src/pattern/resolve.rs index da056c3b2c246..e191011909c37 100644 --- a/app/buck2_common/src/pattern/resolve.rs +++ b/app/buck2_common/src/pattern/resolve.rs @@ -30,7 +30,7 @@ use crate::pattern::package_roots::find_package_roots; /// Pattern where `foo/...` is expanded to matching packages. /// Targets are not validated yet, and `:` is not yet expanded. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct ResolvedPattern { pub specs: BuckIndexMap>, } diff --git a/app/buck2_core/src/pattern/pattern.rs b/app/buck2_core/src/pattern/pattern.rs index 380d83cedc9b4..5e3d9a80df76e 100644 --- a/app/buck2_core/src/pattern/pattern.rs +++ b/app/buck2_core/src/pattern/pattern.rs @@ -1264,7 +1264,7 @@ where })) } -#[derive(Debug, Eq, PartialEq)] +#[derive(Clone, Debug, Eq, PartialEq)] pub enum PackageSpec { /// Given targets in a package. Targets(Vec<(TargetName, T)>), diff --git a/app/buck2_execute/src/execute/prepared.rs b/app/buck2_execute/src/execute/prepared.rs index 9a8e9b5f347cb..9340e39b7209b 100644 --- a/app/buck2_execute/src/execute/prepared.rs +++ b/app/buck2_execute/src/execute/prepared.rs @@ -48,6 +48,7 @@ pub struct PreparedCommand<'a, 'b> { pub target: &'b dyn CommandExecutionTarget, pub prepared_action: &'a PreparedAction, pub digest_config: DigestConfig, + pub force_skip_cache_read: bool, } #[async_trait] diff --git a/app/buck2_execute/src/execute/testing_dry_run.rs b/app/buck2_execute/src/execute/testing_dry_run.rs index 197f9621239cf..ea46332e8403e 100644 --- a/app/buck2_execute/src/execute/testing_dry_run.rs +++ b/app/buck2_execute/src/execute/testing_dry_run.rs @@ -64,6 +64,7 @@ impl PreparedCommandExecutor for DryRunExecutor { target: _target, prepared_action: _prepared_action, digest_config, + force_skip_cache_read: _, } = command; let manager = manager.claim().await; diff --git a/app/buck2_execute/src/materialize/materializer.rs b/app/buck2_execute/src/materialize/materializer.rs index adffdc505b15f..085a860f24f6b 100644 --- a/app/buck2_execute/src/materialize/materializer.rs +++ b/app/buck2_execute/src/materialize/materializer.rs @@ -8,11 +8,13 @@ * above-listed licenses. */ +use std::any::Any; use std::fmt; use std::sync::Arc; use allocative::Allocative; use async_trait::async_trait; +use buck2_common::file_ops::metadata::FileDigest; use buck2_common::file_ops::metadata::FileMetadata; use buck2_core::deferred::base_deferred_key::BaseDeferredKey; use buck2_core::execution_types::executor_config::RemoteExecutorUseCase; @@ -22,6 +24,7 @@ use buck2_core::fs::project_rel_path::ProjectRelativePathBuf; use buck2_directory::directory::directory_iterator::DirectoryIterator; use buck2_directory::directory::entry::DirectoryEntry; use buck2_directory::directory::walk::ordered_entry_walk; +use buck2_error::TypedContext; use buck2_events::dispatch::EventDispatcher; use chrono::DateTime; use chrono::Duration; @@ -112,6 +115,68 @@ pub struct CasNotFoundError { pub error: Arc, } +#[derive(Allocative, Clone, Debug, Eq, PartialEq)] +pub struct ReLostInput { + pub path: ProjectRelativePathBuf, + pub digest: FileDigest, +} + +impl TypedContext for ReLostInput { + fn eq(&self, other: &dyn TypedContext) -> bool { + (other as &dyn Any).downcast_ref::() == Some(self) + } + + fn display(&self) -> Option { + Some(format!("{self:?}")) + } +} + +#[derive(Allocative, Clone, Debug, Eq, PartialEq)] +pub struct ReLostInputs { + inputs: Vec, +} + +impl ReLostInputs { + pub fn new(inputs: Vec) -> Self { + Self { inputs } + } + + pub fn from_cas_not_found(source: &CasNotFoundError) -> Self { + let inputs = ordered_entry_walk(source.directory.as_ref()) + .filter_map(|entry| match entry { + DirectoryEntry::Leaf(ActionDirectoryMember::File(file)) => { + Some(file.digest.data().clone()) + } + DirectoryEntry::Dir(_) | DirectoryEntry::Leaf(_) => None, + }) + .with_paths() + .map(|(entry_path, digest)| ReLostInput { + path: source.path.join(entry_path), + digest, + }) + .collect(); + Self { inputs } + } + + pub fn is_empty(&self) -> bool { + self.inputs.is_empty() + } + + pub fn inputs(&self) -> &[ReLostInput] { + &self.inputs + } +} + +impl TypedContext for ReLostInputs { + fn eq(&self, other: &dyn TypedContext) -> bool { + (other as &dyn Any).downcast_ref::() == Some(self) + } + + fn display(&self) -> Option { + Some(format!("{self:?}")) + } +} + #[derive(buck2_error::Error, Debug)] #[buck2(tag = MaterializationError)] pub enum MaterializationError { diff --git a/app/buck2_execute/src/re/client.rs b/app/buck2_execute/src/re/client.rs index e773646038c2b..64f259df5d44f 100644 --- a/app/buck2_execute/src/re/client.rs +++ b/app/buck2_execute/src/re/client.rs @@ -9,9 +9,12 @@ */ use std::collections::BTreeMap; +use std::str::FromStr; use std::sync::Arc; use std::sync::LazyLock; +use std::sync::Mutex; use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; use std::time::Duration; use std::time::Instant; @@ -37,6 +40,7 @@ use buck2_fs::error::IoResultExt; use buck2_fs::fs_util; use buck2_fs::paths::abs_norm_path::AbsNormPath; use buck2_hash::StdBuckHashMap; +use buck2_hash::StdBuckHashSet; #[cfg(fbcode_build)] use buck2_re_configuration::CASdMode; use buck2_re_configuration::RemoteExecutionStaticMetadataImpl; @@ -641,6 +645,9 @@ static INDUCED_CACHE_MISSES: LazyLock> } }); +static TEST_FAIL_RE_EXECUTE_MISSING_INPUTS_ONCE: AtomicBool = AtomicBool::new(false); +static TEST_FAIL_RE_DOWNLOAD_DIGESTS_ONCE: LazyLock>> = + LazyLock::new(|| Mutex::new(StdBuckHashSet::default())); impl RemoteExecutionClientImpl { async fn new(re_config: &RemoteExecutionConfig) -> buck2_error::Result { let op_name = "REClientBuilder"; @@ -1056,7 +1063,7 @@ impl RemoteExecutionClientImpl { ) -> buck2_error::Result> { if let Some(m) = &*INDUCED_CACHE_MISSES { if m.get(&action_digest.to_string()) - .is_some_and(|b| !b.load(std::sync::atomic::Ordering::Relaxed)) + .is_some_and(|b| !b.load(Ordering::Relaxed)) { return Ok(None); } @@ -1507,7 +1514,7 @@ impl RemoteExecutionClientImpl { let induced_cache_miss = if let Some(m) = &*INDUCED_CACHE_MISSES { m.get(&action_digest.to_string()) - .filter(|v| !v.load(std::sync::atomic::Ordering::Relaxed)) + .filter(|v| !v.load(Ordering::Relaxed)) } else { None }; @@ -1661,6 +1668,28 @@ impl RemoteExecutionClientImpl { ..Default::default() }; let re_action = format!("Execute with digest {}", &action_digest); + if buck2_env!( + "BUCK2_TEST_FAIL_RE_EXECUTE_MISSING_INPUTS_ONCE", + bool, + applicability = testing + )? && identity.paths.input_files_bytes() > 0 + && !TEST_FAIL_RE_EXECUTE_MISSING_INPUTS_ONCE.swap(true, Ordering::Relaxed) + { + return Ok(ExecuteResponseOrCancelled::Response( + ExecuteResponseWithQueueStats { + execute_response: remote_execution::ExecuteResponse { + status: remote_execution::TStatus { + code: TCode::FAILED_PRECONDITION, + message: "Missing CAS input digest injected by test".to_owned(), + ..Default::default() + }, + action_digest: action_digest.to_re(), + ..Default::default() + }, + queue_stats: QueueStats::default(), + }, + )); + } let res = with_error_handler( re_action.as_str(), self.get_session_id(), @@ -1679,7 +1708,7 @@ impl RemoteExecutionClientImpl { .await; if let Some(induced_cache_miss) = induced_cache_miss { - induced_cache_miss.store(true, std::sync::atomic::Ordering::Relaxed); + induced_cache_miss.store(true, Ordering::Relaxed); } let trace = match &res { @@ -1876,6 +1905,13 @@ impl RemoteExecutionClientImpl { } }; + if let Some(digest) = should_fail_re_download(&chunk)? { + return Err(test_re_error( + &format!("Injected missing CAS download for {digest}"), + TCode::NOT_FOUND, + )); + } + let response = with_error_handler( "materialize_files", self.get_session_id(), @@ -2028,6 +2064,64 @@ impl RemoteExecutionClientImpl { } } +fn should_fail_re_download( + files: &[NamedDigestWithPermissions], +) -> buck2_error::Result> { + fn convert_digests(val: &str) -> buck2_error::Result> { + val.split_whitespace() + .map(|digest| { + TDigest::from_str(digest) + .map_err(|e| from_any_with_tag(e, buck2_error::ErrorTag::InvalidDigest)) + .with_buck_error_context(|| format!("Invalid digest: {digest}")) + }) + .collect() + } + + let injected_digests_file = buck2_env!( + "BUCK2_TEST_FAIL_RE_DOWNLOAD_DIGESTS_ONCE_FILE", + applicability = testing + )?; + if let Some(path) = injected_digests_file { + let injected_digests = std::fs::read_to_string(&path) + .with_buck_error_context(|| format!("Failed to read `{path}`"))?; + for injected_digest in convert_digests(&injected_digests)? { + if should_fail_digest_once(files, &injected_digest) { + return Ok(Some(injected_digest)); + } + } + } + + let injected_digests = buck2_env!( + "BUCK2_TEST_FAIL_RE_DOWNLOAD_DIGESTS_ONCE", + type=Vec, + converter=convert_digests, + applicability=testing + )?; + + let Some(injected_digests) = injected_digests else { + return Ok(None); + }; + + for injected_digest in injected_digests { + if should_fail_digest_once(files, &injected_digest) { + return Ok(Some(injected_digest.clone())); + } + } + + Ok(None) +} + +fn should_fail_digest_once(files: &[NamedDigestWithPermissions], digest: &TDigest) -> bool { + if !files.iter().any(|file| file.named_digest.digest == *digest) { + return false; + } + + TEST_FAIL_RE_DOWNLOAD_DIGESTS_ONCE + .lock() + .expect("Poisoned lock") + .insert(digest.to_string()) +} + /// Drop the REClient on a blocking thread. The REClient destructor does a blocking wait on async /// calls (it tells the server to cancel its calls, but it waits for an ack), so we shouldn't drop /// it on a runtime thread. diff --git a/app/buck2_execute/src/re/uploader.rs b/app/buck2_execute/src/re/uploader.rs index 667edd5a0e8d5..377785fcd7409 100644 --- a/app/buck2_execute/src/re/uploader.rs +++ b/app/buck2_execute/src/re/uploader.rs @@ -61,8 +61,9 @@ use crate::directory::ActionImmutableDirectory; use crate::directory::ReDirectorySerializer; use crate::execute::blobs::ActionBlobs; use crate::materialize::materializer::ArtifactNotMaterializedReason; -use crate::materialize::materializer::CasDownloadInfo; use crate::materialize::materializer::Materializer; +use crate::materialize::materializer::ReLostInput; +use crate::materialize::materializer::ReLostInputs; use crate::re::action_identity::ReActionIdentity; use crate::re::client::RemoteExecutionClient; use crate::re::error::with_error_handler; @@ -120,9 +121,23 @@ impl Uploader { } }; + let mut injectable_input_digests = input_digests.clone(); + { + for entry in input_dir.unordered_walk().without_paths() { + let digest = match entry { + DirectoryEntry::Dir(d) => d.as_fingerprinted_dyn().fingerprint(), + DirectoryEntry::Leaf(ActionDirectoryMember::File(f)) => &f.digest, + DirectoryEntry::Leaf(..) => continue, + }; + injectable_input_digests.insert(digest); + } + + injectable_input_digests.insert(input_dir.fingerprint()); + }; + let mut upload_blobs = Vec::new(); let mut missing_digests = StdBuckHashSet::default(); - add_injected_missing_digests(&input_digests, &mut missing_digests)?; + add_injected_missing_digests(&injectable_input_digests, &mut missing_digests)?; let digests_and_ttls_iterator = if deduplicate_get_digests_ttl_calls { let (fut, reqs, new) = { @@ -264,6 +279,7 @@ impl Uploader { // Track what files should be materialized before we upload. let mut paths_to_materialize = Vec::new(); + let mut lost_inputs = Vec::new(); if !missing_digests.is_empty() { let mut upload_file_paths = Vec::new(); @@ -324,8 +340,8 @@ impl Uploader { } Err( ref err @ ArtifactNotMaterializedReason::RequiresCasDownload { + ref path, ref entry, - ref info, .. }, ) => { @@ -337,30 +353,10 @@ impl Uploader { // won't be here. On the flip side, if a digest has been in the CAS for // a very long time, it might have expired. if file.digest.to_re() == digest { - if should_error_for_missing_digest(info) { - soft_error!( - "cas_missing_fatal", - buck2_error::buck2_error!( - buck2_error::ErrorTag::Input, - "{} missing (origin: {})", - file.digest, - info.origin.as_display_for_not_found(), - ), - daemon_in_memory_state_is_corrupted: true, - action_cache_is_corrupted: info.origin.guaranteed_by_action_cache() - )?; - - return Err(buck2_error::buck2_error!( - buck2_error::ErrorTag::ReCasArtifactExpired, - "Your build requires an artifact that has expired in the RE CAS \ - and Buck does not have it. This likely happened because your Buck daemon \ - has been online for a long time. This error is currently unrecoverable. \ - To proceed, you should restart Buck using `buck2 killall`. \ - Debug information: {:#}", - err - )); - } - + let lost_input = ReLostInput { + path: path.clone(), + digest: file.digest.data().clone(), + }; soft_error!( "cas_missing", buck2_error::buck2_error!( @@ -371,8 +367,10 @@ impl Uploader { err ), quiet: true - )?; + ) + .map_err(|e| e.context(lost_input.clone()))?; + lost_inputs.push(lost_input); continue; } } @@ -398,6 +396,10 @@ impl Uploader { } } + if !lost_inputs.is_empty() { + return Err(error_for_lost_cas_inputs(lost_inputs)); + } + if !paths_to_materialize.is_empty() { materializer .ensure_materialized(paths_to_materialize) @@ -475,23 +477,6 @@ impl Uploader { } } -fn should_error_for_missing_digest(info: &CasDownloadInfo) -> bool { - // RE sometimes reports things that exist as missing. We don't fully understand why at this - // time and this is being investigated, but we know that RE normally ensures that anything it - // returns to us will last for another 6 hours at least. So, we silence all errors when the - // download info is less than 5 hours, because we know those are most likely bogus. This - // basically means that after 5 hours we might tell the user to restart even though they don't - // need to. However, the alternative is confused users who see errors after a couple days (we - // had 3 reports of this in a week), so for now we do accept some false positives (when RE - // tells us a digest doesn't exist even though it does) in order to provide better UX when we - // hit a true positive. - if let Some(age) = info.action_age() { - age >= Duration::seconds(3600 * 5) - } else { - true - } -} - fn directory_to_blob<'a, D>(d: D) -> InlinedBlobWithDigest where D: ActionFingerprintedDirectoryRef<'a>, @@ -503,6 +488,28 @@ where } } +fn error_for_lost_cas_inputs(lost_inputs: Vec) -> buck2_error::Error { + let mut error = match lost_inputs.as_slice() { + [lost_input] => buck2_error::buck2_error!( + buck2_error::ErrorTag::ReNotFound, + "Action execution requires artifact `{}` but it is missing from remote CAS", + lost_input.digest, + ), + lost_inputs => buck2_error::buck2_error!( + buck2_error::ErrorTag::ReNotFound, + "Action execution requires {} artifacts that are missing from remote CAS", + lost_inputs.len(), + ), + } + .context(ReLostInputs::new(lost_inputs.clone())); + + for lost_input in lost_inputs { + error = error.context(lost_input); + } + + error +} + fn error_for_missing_file( digest: &TDigest, cause: &ArtifactNotMaterializedReason, @@ -524,7 +531,7 @@ fn add_injected_missing_digests<'a>( missing_digests: &mut StdBuckHashSet<&'a TrackedFileDigest>, ) -> buck2_error::Result<()> { fn convert_digests(val: &str) -> buck2_error::Result> { - val.split(' ') + val.split_whitespace() .map(|digest| { let digest = TDigest::from_str(digest) .map_err(|e| from_any_with_tag(e, buck2_error::ErrorTag::InvalidDigest)) @@ -536,6 +543,26 @@ fn add_injected_missing_digests<'a>( .collect() } + fn add_digests_once<'a>( + input_digests: &StdBuckHashSet<&'a TrackedFileDigest>, + missing_digests: &mut StdBuckHashSet<&'a TrackedFileDigest>, + digests: Vec, + ) { + static INJECTED_ONCE: LazyLock>> = + LazyLock::new(|| Mutex::new(StdBuckHashSet::default())); + + for d in digests { + let matched = input_digests.get(&d); + if let Some(i) = matched { + let mut injected_once = INJECTED_ONCE.lock().expect("Poisoned lock"); + if !injected_once.contains(&d) { + injected_once.insert(d.clone()); + missing_digests.insert(i); + } + } + } + } + let ingested_digests = buck2_env!( "BUCK2_TEST_INJECTED_MISSING_DIGESTS", type=Vec, @@ -550,6 +577,30 @@ fn add_injected_missing_digests<'a>( } } + let injected_digests_file = buck2_env!( + "BUCK2_TEST_INJECTED_MISSING_DIGESTS_ONCE_FILE", + applicability = testing + )?; + if let Some(path) = injected_digests_file { + let injected_digests = std::fs::read_to_string(&path) + .with_buck_error_context(|| format!("Failed to read {path}"))?; + add_digests_once( + input_digests, + missing_digests, + convert_digests(&injected_digests)?, + ); + } + + let ingested_digests_once = buck2_env!( + "BUCK2_TEST_INJECTED_MISSING_DIGESTS_ONCE", + type=Vec, + converter=convert_digests, + applicability=testing + )?; + if let Some(digests) = ingested_digests_once { + add_digests_once(input_digests, missing_digests, digests.to_vec()); + } + Ok(()) } diff --git a/app/buck2_execute_impl/src/executors/local.rs b/app/buck2_execute_impl/src/executors/local.rs index 95e0b0c0777c7..3e5a182fd53fd 100644 --- a/app/buck2_execute_impl/src/executors/local.rs +++ b/app/buck2_execute_impl/src/executors/local.rs @@ -71,6 +71,7 @@ use buck2_execute::materialize::materializer::CopiedArtifact; use buck2_execute::materialize::materializer::DeclareArtifactPayload; use buck2_execute::materialize::materializer::MaterializationError; use buck2_execute::materialize::materializer::Materializer; +use buck2_execute::materialize::materializer::ReLostInputs; use buck2_execute_local::CommandResult; use buck2_execute_local::DefaultKillProcess; use buck2_execute_local::GatherOutputStatus; @@ -1250,6 +1251,7 @@ impl PreparedCommandExecutor for LocalExecutor { target: _, prepared_action, digest_config, + force_skip_cache_read: _, } = command; manager.start_waiting_category(WaitingCategory::LocalQueued); @@ -1433,15 +1435,22 @@ pub async fn materialize_inputs( Ok(()) => {} Err(MaterializationError::NotFound { source }) => { let corrupted = source.info.origin.guaranteed_by_action_cache(); + let lost_inputs = ReLostInputs::from_cas_not_found(&source); - return Err(tag_error!( + let error = tag_error!( "cas_missing_fatal", MaterializationError::NotFound { source }.into(), quiet: true, task: false, daemon_in_memory_state_is_corrupted: true, action_cache_is_corrupted: corrupted - )); + ); + + return Err(if lost_inputs.is_empty() { + error + } else { + error.context(lost_inputs) + }); } Err(e) => { return Err(e.into()); diff --git a/app/buck2_execute_impl/src/executors/re.rs b/app/buck2_execute_impl/src/executors/re.rs index b83bfc2d10b0b..e9acfff71666e 100644 --- a/app/buck2_execute_impl/src/executors/re.rs +++ b/app/buck2_execute_impl/src/executors/re.rs @@ -171,6 +171,7 @@ impl ReExecutor { re_gang_workers: &[buck2_core::execution_types::executor_config::ReGangWorker], meta_internal_extra_params: &MetaInternalExtraParams, worker_tool_action_digest: Option, + force_skip_cache_read: bool, ) -> ControlFlow { info!( @@ -188,7 +189,7 @@ impl ReExecutor { re_gang_workers, identity, &mut manager, - self.skip_cache_read, + self.skip_cache_read || force_skip_cache_read, self.skip_cache_write, self.re_max_queue_time, self.re_resource_units, @@ -378,6 +379,7 @@ impl PreparedCommandExecutor for ReExecutor { network_access: _, }, digest_config, + force_skip_cache_read, } = command; let details = RemoteCommandExecutionDetails::new( @@ -454,6 +456,7 @@ impl PreparedCommandExecutor for ReExecutor { &re_gang_workers, command.request.meta_internal_extra_params(), worker_tool_action_digest, + *force_skip_cache_read, ) .await?; diff --git a/app/buck2_server/src/ctx.rs b/app/buck2_server/src/ctx.rs index a02cd6b35b586..2cf4c58060377 100644 --- a/app/buck2_server/src/ctx.rs +++ b/app/buck2_server/src/ctx.rs @@ -22,6 +22,7 @@ use buck2_build_api::actions::execute::dice_data::SetReClient; use buck2_build_api::actions::execute::dice_data::set_fallback_executor_config; use buck2_build_api::actions::impls::run_action_knobs::HasRunActionKnobs; use buck2_build_api::actions::impls::run_action_knobs::RunActionKnobs; +use buck2_build_api::actions::rewind::HasActionRewindTracker; use buck2_build_api::build::HasCreateUnhashedSymlinkLock; use buck2_build_api::build::detailed_aggregated_metrics::dice::HasDetailedAggregatedMetrics; use buck2_build_api::build::detailed_aggregated_metrics::dice::SetDetailedAggregatedMetricsEventsHolder; @@ -911,6 +912,7 @@ impl DiceCommandUpdater<'_, '_> { data.set_http_client(self.cmd_ctx.base_context.daemon.http_client.dupe()); data.set_materializer(self.cmd_ctx.base_context.daemon.materializer.dupe()); data.init_materialization_queue_tracker(); + data.init_action_rewind_tracker(); data.set_build_signals(self.build_signals.build_signals.dupe()); data.set_run_action_knobs(run_action_knobs); data.set_create_unhashed_symlink_lock( diff --git a/app/buck2_server_commands/src/build.rs b/app/buck2_server_commands/src/build.rs index 8f092936be979..65f3072749deb 100644 --- a/app/buck2_server_commands/src/build.rs +++ b/app/buck2_server_commands/src/build.rs @@ -13,7 +13,12 @@ use std::sync::Arc; use std::time::Instant; use async_trait::async_trait; +use buck2_artifact::actions::key::ActionKey; use buck2_build_api::actions::artifact::get_artifact_fs::GetArtifactFs; +use buck2_build_api::actions::calculation::ActionCalculation; +use buck2_build_api::actions::calculation::BuildKey; +use buck2_build_api::actions::rewind::ActionRewindRequest; +use buck2_build_api::actions::rewind::HasActionRewindTracker; use buck2_build_api::build; use buck2_build_api::build::AsyncBuildTargetResultBuilder; use buck2_build_api::build::BuildEvent; @@ -32,6 +37,7 @@ use buck2_build_api::build::detailed_aggregated_metrics::types::ActionGraphSketc use buck2_build_api::build::detailed_aggregated_metrics::types::ArtifactPathSketchResult; use buck2_build_api::build::detailed_aggregated_metrics::types::DetailedAggregatedMetrics; use buck2_build_api::build::graph_properties::GraphPropertiesOptions; +use buck2_build_api::materialize::HasMaterializationQueueTracker; use buck2_build_api::materialize::MaterializationAndUploadContext; use buck2_cli_proto::CommonBuildOptions; use buck2_cli_proto::build_request::BuildProviders; @@ -143,6 +149,8 @@ fn expect_build_opts(req: &buck2_cli_proto::BuildRequest) -> &CommonBuildOptions req.build_opts.as_ref().expect("should have build options") } +const MAX_REPEATED_ACTION_REWINDS: usize = 20; + #[derive(buck2_error::Error, Debug)] #[buck2(tag = Input)] #[error( @@ -369,7 +377,7 @@ async fn build( .map(|s| s.into_build_provider_type()) .collect(); - let (streaming_build_result_tx, streaming_build_result_rx) = + let (streaming_build_result_tx, mut streaming_build_result_rx) = tokio::sync::mpsc::unbounded_channel(); // Avoid computing and generating streaming build results if we don't have to let build_command_streaming_build_result_tx = if !build_opts @@ -386,36 +394,61 @@ async fn build( .as_ref() .is_some_and(|o| o.return_run_args); let build_start = Instant::now(); - let cloned_ctx = ctx.clone(); // build_future does a mutable borrow on the context, so we clone it first - let build_future = ctx.with_linear_recompute(|ctx| async move { - build_targets( - &ctx, - resolved_pattern, - target_resolution_config, - build_providers, - (final_artifact_materializations, final_artifact_uploads).into(), - build_opts.fail_fast, - MissingTargetBehavior::from_skip(build_opts.skip_missing_targets), - build_opts.skip_incompatible_targets, + let mut rewinds = 0; + let build_result = loop { + let cloned_ctx = ctx.clone(); // build_future does a mutable borrow on the context, so we clone it first + let build_providers = build_providers.dupe(); + let build_command_streaming_build_result_tx = + build_command_streaming_build_result_tx.clone(); + let resolved_pattern = &resolved_pattern; + let target_resolution_config = &target_resolution_config; + let timeout_observer = timeout_observer.as_ref(); + let build_future = ctx.with_linear_recompute(|ctx| async move { + build_targets( + &ctx, + &resolved_pattern, + &target_resolution_config, + build_providers, + (final_artifact_materializations, final_artifact_uploads).into(), + build_opts.fail_fast, + MissingTargetBehavior::from_skip(build_opts.skip_missing_targets), + build_opts.skip_incompatible_targets, + graph_properties.dupe(), + return_run_args, + timeout_observer, + build_command_streaming_build_result_tx, + build_start, + ) + .await + }); + + let build_result = maybe_stream_build_reports( + build_future, + build_opts, + cloned_ctx, graph_properties.dupe(), - return_run_args, - timeout_observer.as_ref(), - build_command_streaming_build_result_tx, - build_start, + server_ctx, + request, + &mut streaming_build_result_rx, ) - .await - }); + .await?; - let build_result = maybe_stream_build_reports( - build_future, - build_opts, - cloned_ctx, - graph_properties.dupe(), - server_ctx, - request, - streaming_build_result_rx, - ) - .await?; + let Some(rewind) = find_action_rewind_request(&build_result) else { + break build_result; + }; + + if rewinds >= MAX_REPEATED_ACTION_REWINDS { + break build_result; + } + rewinds += 1; + + tracing::info!( + rewind_count = rewinds, + rewind_actions = rewind.action_keys().len(), + "Rewinding DICE action graph after lost remote CAS input" + ); + ctx = apply_action_rewind(ctx, rewind).await?; + }; let want_detailed_metrics = ctx .parse_legacy_config_property( @@ -569,7 +602,7 @@ async fn maybe_stream_build_reports( graph_properties: GraphPropertiesOptions, server_ctx: &dyn ServerCommandContextTrait, request: &buck2_cli_proto::BuildRequest, - mut streaming_build_result_rx: tokio::sync::mpsc::UnboundedReceiver, + streaming_build_result_rx: &mut tokio::sync::mpsc::UnboundedReceiver, ) -> buck2_error::Result { if build_opts .unstable_streaming_build_report_filename @@ -622,6 +655,76 @@ async fn maybe_stream_build_reports( } } +fn find_action_rewind_request(build_result: &BuildTargetResult) -> Option { + fn merge_rewind(request: &mut Option, error: &buck2_error::Error) { + let Some(found) = error.find_typed_context::() else { + return; + }; + + match request { + Some(request) => request.merge(&found), + None => *request = Some((*found).clone()), + } + } + + let mut request = None; + for errors in build_result.other_errors.values() { + for error in errors { + merge_rewind(&mut request, error); + } + } + + for configured in build_result.configured.values().flatten() { + for error in &configured.errors { + merge_rewind(&mut request, &error.inner); + } + for output in &configured.outputs { + if let Err(error) = &output.inner { + merge_rewind(&mut request, error); + } + } + } + + request +} + +async fn apply_action_rewind( + mut ctx: DiceTransaction, + rewind: ActionRewindRequest, +) -> buck2_error::Result { + let action_keys = canonical_rewind_action_keys(&mut ctx, rewind.into_action_keys()).await?; + let mut updater = ctx.into_updater(); + updater.changed( + action_keys + .iter() + .cloned() + .map(BuildKey) + .collect::>(), + )?; + let ctx = updater.commit().await; + ctx.per_transaction_data() + .clear_materialization_queue_tracker(); + ctx.per_transaction_data().set_rewound_actions(&action_keys); + Ok(ctx) +} + +async fn canonical_rewind_action_keys( + ctx: &mut DiceTransaction, + action_keys: Vec, +) -> buck2_error::Result> { + let mut canonical_action_keys = Vec::with_capacity(action_keys.len()); + for action_key in action_keys { + let action = ActionCalculation::get_action(ctx, &action_key).await?; + if !canonical_action_keys + .iter() + .any(|existing| existing == action.key()) + { + canonical_action_keys.push(action.key().dupe()); + } + } + Ok(canonical_action_keys) +} + async fn process_build_result( server_ctx: &dyn ServerCommandContextTrait, mut ctx: DiceTransaction, @@ -729,8 +832,8 @@ async fn process_build_result( async fn build_targets( ctx: &LinearRecomputeDiceComputations<'_>, - spec: ResolvedPattern, - target_resolution_config: TargetResolutionConfig, + spec: &ResolvedPattern, + target_resolution_config: &TargetResolutionConfig, build_providers: Arc, materialization_and_upload: MaterializationAndUploadContext, fail_fast: bool, @@ -746,14 +849,14 @@ async fn build_targets( AsyncBuildTargetResultBuilder::new(streaming_build_result_tx, build_start); let fut = match target_resolution_config { TargetResolutionConfig::Default(global_cfg_options) => { - let spec = spec.convert_pattern().buck_error_context( + let spec = spec.clone().convert_pattern().buck_error_context( "Targets with explicit configuration can only be built when the `--target-universe=` flag is provided", )?; build_targets_with_global_target_platform( &consumer, ctx, spec, - global_cfg_options, + global_cfg_options.dupe(), build_providers, materialization_and_upload, missing_target_behavior, @@ -784,8 +887,8 @@ async fn build_targets( async fn build_targets_in_universe( event_consumer: &dyn BuildEventConsumer, ctx: &LinearRecomputeDiceComputations<'_>, - spec: ResolvedPattern, - universe: CqueryUniverse, + spec: &ResolvedPattern, + universe: &CqueryUniverse, build_providers: Arc, materialization_and_upload: MaterializationAndUploadContext, graph_properties: GraphPropertiesOptions, diff --git a/app/buck2_test/src/orchestrator.rs b/app/buck2_test/src/orchestrator.rs index 5786af19956da..ab637aa7e9f27 100644 --- a/app/buck2_test/src/orchestrator.rs +++ b/app/buck2_test/src/orchestrator.rs @@ -1221,6 +1221,7 @@ impl BuckTestOrchestrator<'_> { request: &request, prepared_action: &prepared_action, digest_config, + force_skip_cache_read: false, }; // instrument execution with a span. @@ -1973,6 +1974,7 @@ impl BuckTestOrchestrator<'_> { request: &context.execution_request, prepared_action: &prepared_action, digest_config, + force_skip_cache_read: false, }; let command = executor.exec_cmd(manager, &prepared_command, cancellation); diff --git a/tests/core/executor/test_action_rewinding.py b/tests/core/executor/test_action_rewinding.py new file mode 100644 index 0000000000000..99222791e22b4 --- /dev/null +++ b/tests/core/executor/test_action_rewinding.py @@ -0,0 +1,461 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# +# This source code is dual-licensed under either the MIT license found in the +# LICENSE-MIT file in the root directory of this source tree or the Apache +# License, Version 2.0 found in the LICENSE-APACHE file in the root directory +# of this source tree. You may select, at your option, one of the +# above-listed licenses. + +# pyre-strict + +import hashlib +from pathlib import Path + +from buck2.tests.e2e_util.api.buck import Buck +from buck2.tests.e2e_util.buck_workspace import buck_test +from buck2.tests.e2e_util.helper.utils import filter_events +from buck2.tests.e2e_util.helper.utils import read_what_ran + + +def _digest(content: str) -> str: + return f"{hashlib.sha1(content.encode()).hexdigest()}:{len(content)}" + + +PRODUCER_CONTENT = "generated input for action rewind\n" +PRODUCER_DIGEST = _digest(PRODUCER_CONTENT) +FIRST_PRODUCER_CONTENT = "first generated input for action rewind\n" +FIRST_PRODUCER_DIGEST = _digest(FIRST_PRODUCER_CONTENT) +SECOND_PRODUCER_CONTENT = "second generated input for action rewind\n" +SECOND_PRODUCER_DIGEST = _digest(SECOND_PRODUCER_CONTENT) +TREE_PRODUCER_CONTENT = "tree generated input for action rewind\n" +TREE_PRODUCER_DIGEST = _digest(TREE_PRODUCER_CONTENT) +TOP_LEVEL_OUTPUT_CONTENT = "top-level generated output for action rewind\n" +TOP_LEVEL_OUTPUT_DIGEST = _digest(TOP_LEVEL_OUTPUT_CONTENT) + +CONSUMER_CONTENT = f"consumer saw: {PRODUCER_CONTENT}" +CONSUMER_DIGEST = _digest(CONSUMER_CONTENT) +CONSUMER_TWO_CONTENT = ( + f"multi consumer saw:\n{FIRST_PRODUCER_CONTENT}{SECOND_PRODUCER_CONTENT}" +) +CONSUMER_TWO_DIGEST = _digest(CONSUMER_TWO_CONTENT) +TREE_CONSUMER_CONTENT = f"tree consumer saw: {TREE_PRODUCER_CONTENT}" +TREE_CONSUMER_DIGEST = _digest(TREE_CONSUMER_CONTENT) +TREE_DIR_CONSUMER_CONTENT = f"tree dir consumer saw: {TREE_PRODUCER_CONTENT}" +TREE_DIR_CONSUMER_DIGEST = _digest(TREE_DIR_CONSUMER_CONTENT) +MANY_INPUT_COUNT = 25 +MANY_PRODUCER_CONTENTS = [ + f"many generated input {i} for action rewind\n" for i in range(MANY_INPUT_COUNT) +] +MANY_PRODUCER_DIGESTS = [_digest(content) for content in MANY_PRODUCER_CONTENTS] +CONSUMER_MANY_CONTENT = "many consumer saw:\n" + "".join(MANY_PRODUCER_CONTENTS) +CONSUMER_MANY_DIGEST = _digest(CONSUMER_MANY_CONTENT) + +TARGET = "root//:consumer" +LOCAL_INPUT_CONSUMER_TARGET = "root//:local_input_consumer" +CONSUMER_TWO_TARGET = "root//:consumer_two" +CONSUMER_MANY_TARGET = "root//:consumer_many" +TREE_CONSUMER_TARGET = "root//:tree_consumer" +TREE_DIR_CONSUMER_TARGET = "root//:tree_dir_consumer" +TOP_LEVEL_OUTPUT_TARGET = "root//:top_level_output" +NONDETERMINISTIC_PRODUCER_TARGET = "root//:nondeterministic_producer" +NONDETERMINISTIC_CONSUMER_TARGET = "root//:nondeterministic_consumer" +NONDETERMINISTIC_MIDDLE_TARGET = "root//:nondeterministic_middle" +NONDETERMINISTIC_CHAIN_CONSUMER_TARGET = "root//:nondeterministic_chain_consumer" + +REMOTE_ARGS = [ + "--remote-only", + "--no-remote-cache", +] + + +async def _restart_with_test_env(buck: Buck, env: dict[str, str]) -> None: + for key, value in env.items(): + buck.set_env(key, value) + await buck.kill() + + +async def _assert_remote_actions_ran( + buck: Buck, + expected_identity_fragments: list[str], +) -> None: + what_ran = await read_what_ran(buck) + actions = { + entry["identity"]: entry["reproducer"]["executor"] + for entry in what_ran + if "reproducer" in entry + } + + for fragment in expected_identity_fragments: + assert any(fragment in identity for identity in actions), actions + assert all(executor == "Re" for executor in actions.values()), actions + + +async def _assert_actions_ran_on( + buck: Buck, + expected_executors: dict[str, str], +) -> None: + what_ran = await read_what_ran(buck) + actions = { + entry["identity"]: entry["reproducer"]["executor"] + for entry in what_ran + if "reproducer" in entry + } + + for fragment, expected_executor in expected_executors.items(): + matching_executors = [ + executor for identity, executor in actions.items() if fragment in identity + ] + assert matching_executors, actions + assert expected_executor in matching_executors, actions + + +async def _seed_stale_digest_from_action_event( + buck: Buck, + fail_digests_file: Path, + target: str, + output_path: str, +) -> None: + await buck.build( + target, + *REMOTE_ARGS, + "--materializations=none", + ) + fail_digests_file.write_text( + await _single_action_output_digest(buck, output_path), + encoding="utf-8", + ) + + +async def _action_output_digests(buck: Buck, output_path: str) -> list[str]: + action_executions = await filter_events( + buck, + "Event", + "data", + "SpanEnd", + "data", + "ActionExecution", + ) + return [ + output["digest"] + for execution in action_executions + for output in execution.get("outputs", []) + if output.get("path") == output_path + ] + + +async def _single_action_output_digest(buck: Buck, output_path: str) -> str: + digests = await _action_output_digests(buck, output_path) + assert len(digests) == 1, digests + return digests[0] + + +async def _last_action_output_digest(buck: Buck, output_path: str) -> str: + digests = await _action_output_digests(buck, output_path) + assert digests, output_path + return digests[-1] + + +async def _assert_action_output_digest( + buck: Buck, + output_path: str, + expected_digest: str, +) -> None: + assert await _single_action_output_digest(buck, output_path) == expected_digest + + +def _single_recorded_digest(path: Path) -> str: + digests = path.read_text(encoding="utf-8").splitlines() + assert len(digests) == 1, digests + return digests[0] + + +@buck_test() +async def test_rewinds_generated_input_evicted_from_remote_cache_mid_build( + buck: Buck, +) -> None: + await _restart_with_test_env( + buck, + { + "BUCK2_TEST_INJECTED_MISSING_DIGESTS_ONCE": PRODUCER_DIGEST, + }, + ) + await buck.build( + TARGET, + *REMOTE_ARGS, + "--materializations=none", + ) + + await _assert_action_output_digest(buck, "consumer.txt", CONSUMER_DIGEST) + await _assert_remote_actions_ran(buck, ["root//:producer", "root//:consumer"]) + + +@buck_test() +async def test_rewinds_generated_input_evicted_during_local_materialization( + buck: Buck, +) -> None: + await _restart_with_test_env( + buck, + { + "BUCK2_TEST_FAIL_RE_DOWNLOAD_DIGESTS_ONCE": PRODUCER_DIGEST, + }, + ) + result = await buck.build( + LOCAL_INPUT_CONSUMER_TARGET, + "--prefer-remote", + "--no-remote-cache", + ) + + output = result.get_build_report().output_for_target(LOCAL_INPUT_CONSUMER_TARGET) + assert output.read_text() == CONSUMER_CONTENT + await _assert_actions_ran_on( + buck, + { + "root//:local_input_producer": "Re", + "root//:local_input_consumer": "Local", + }, + ) + + +@buck_test() +async def test_rewinds_worker_reported_missing_generated_input( + buck: Buck, +) -> None: + await _restart_with_test_env( + buck, + { + "BUCK2_TEST_FAIL_RE_EXECUTE_MISSING_INPUTS_ONCE": "true", + }, + ) + await buck.build( + TARGET, + *REMOTE_ARGS, + "--materializations=none", + ) + + await _assert_action_output_digest(buck, "consumer.txt", CONSUMER_DIGEST) + await _assert_remote_actions_ran(buck, ["root//:producer", "root//:consumer"]) + + +@buck_test() +async def test_rewinds_multiple_regular_inputs_evicted_from_remote_cache_mid_build( + buck: Buck, +) -> None: + await _restart_with_test_env( + buck, + { + "BUCK2_TEST_INJECTED_MISSING_DIGESTS_ONCE": ( + f"{FIRST_PRODUCER_DIGEST} {SECOND_PRODUCER_DIGEST}" + ), + }, + ) + await buck.build( + CONSUMER_TWO_TARGET, + *REMOTE_ARGS, + "--materializations=none", + ) + + await _assert_action_output_digest(buck, "consumer_two.txt", CONSUMER_TWO_DIGEST) + await _assert_remote_actions_ran( + buck, + ["root//:producer_first", "root//:producer_second", "root//:consumer_two"], + ) + + +@buck_test() +async def test_rewinds_many_regular_inputs_evicted_from_remote_cache_in_one_retry( + buck: Buck, +) -> None: + await _restart_with_test_env( + buck, + { + "BUCK2_TEST_INJECTED_MISSING_DIGESTS_ONCE": " ".join( + MANY_PRODUCER_DIGESTS + ), + }, + ) + await buck.build( + CONSUMER_MANY_TARGET, + *REMOTE_ARGS, + "--materializations=none", + ) + + await _assert_action_output_digest(buck, "consumer_many.txt", CONSUMER_MANY_DIGEST) + await _assert_remote_actions_ran( + buck, + ["root//:consumer_many"] + + [f"root//:producer_many_{i}" for i in range(MANY_INPUT_COUNT)], + ) + + +@buck_test() +async def test_rewinds_tree_file_input_evicted_from_remote_cache_mid_build( + buck: Buck, +) -> None: + await _restart_with_test_env( + buck, + { + "BUCK2_TEST_INJECTED_MISSING_DIGESTS_ONCE": TREE_PRODUCER_DIGEST, + }, + ) + await buck.build( + TREE_CONSUMER_TARGET, + *REMOTE_ARGS, + "--materializations=none", + ) + + await _assert_action_output_digest( + buck, + "tree_consumer.txt", + TREE_CONSUMER_DIGEST, + ) + await _assert_remote_actions_ran( + buck, + ["root//:tree_producer", "root//:tree_consumer"], + ) + + +@buck_test() +async def test_rewinds_directory_input_leaf_evicted_from_remote_cache_mid_build( + buck: Buck, +) -> None: + await _restart_with_test_env( + buck, + { + "BUCK2_TEST_INJECTED_MISSING_DIGESTS_ONCE": TREE_PRODUCER_DIGEST, + }, + ) + await buck.build( + TREE_DIR_CONSUMER_TARGET, + *REMOTE_ARGS, + "--materializations=none", + ) + + await _assert_action_output_digest( + buck, + "tree_dir_consumer.txt", + TREE_DIR_CONSUMER_DIGEST, + ) + await _assert_remote_actions_ran( + buck, + ["root//:tree_dir_producer", "root//:tree_dir_consumer"], + ) + + +@buck_test() +async def test_rewinds_requested_top_level_output_evicted_during_default_materialization( + buck: Buck, +) -> None: + await _restart_with_test_env( + buck, + { + "BUCK2_TEST_FAIL_RE_DOWNLOAD_DIGESTS_ONCE": TOP_LEVEL_OUTPUT_DIGEST, + }, + ) + result = await buck.build( + TOP_LEVEL_OUTPUT_TARGET, + *REMOTE_ARGS, + ) + + output = result.get_build_report().output_for_target(TOP_LEVEL_OUTPUT_TARGET) + assert output.read_text() == TOP_LEVEL_OUTPUT_CONTENT + await _assert_remote_actions_ran(buck, ["root//:top_level_output"]) + + +@buck_test() +async def test_rewinds_requested_output_evicted_during_final_upload_with_materialization( + buck: Buck, +) -> None: + await _restart_with_test_env( + buck, + { + "BUCK2_TEST_INJECTED_MISSING_DIGESTS_ONCE": TOP_LEVEL_OUTPUT_DIGEST, + }, + ) + result = await buck.build( + TOP_LEVEL_OUTPUT_TARGET, + *REMOTE_ARGS, + "--upload-final-artifacts=always", + ) + + output = result.get_build_report().output_for_target(TOP_LEVEL_OUTPUT_TARGET) + assert output.read_text() == TOP_LEVEL_OUTPUT_CONTENT + assert await _action_output_digests(buck, "producer.txt") == [ + TOP_LEVEL_OUTPUT_DIGEST, + TOP_LEVEL_OUTPUT_DIGEST, + ] + await _assert_remote_actions_ran(buck, ["root//:top_level_output"]) + + +@buck_test() +async def test_rewinds_nondeterministic_generated_input_with_fresh_digest( + buck: Buck, + tmp_path: Path, +) -> None: + fail_digests_file = tmp_path / "lost-input-digests" + fail_digests_file.write_text("", encoding="utf-8") + await _restart_with_test_env( + buck, + { + "BUCK2_TEST_INJECTED_MISSING_DIGESTS_ONCE_FILE": str(fail_digests_file), + }, + ) + await _seed_stale_digest_from_action_event( + buck, + fail_digests_file, + NONDETERMINISTIC_PRODUCER_TARGET, + "nondeterministic.txt", + ) + + await buck.build( + NONDETERMINISTIC_CONSUMER_TARGET, + *REMOTE_ARGS, + "--materializations=none", + ) + + assert await _last_action_output_digest( + buck, + "nondeterministic.txt", + ) != _single_recorded_digest(fail_digests_file) + await _assert_remote_actions_ran( + buck, + ["root//:nondeterministic_producer", "root//:nondeterministic_consumer"], + ) + + +@buck_test() +async def test_rewinds_nondeterministic_intermediate_action_with_fresh_digest( + buck: Buck, + tmp_path: Path, +) -> None: + fail_digests_file = tmp_path / "lost-intermediate-digests" + fail_digests_file.write_text("", encoding="utf-8") + await _restart_with_test_env( + buck, + { + "BUCK2_TEST_INJECTED_MISSING_DIGESTS_ONCE_FILE": str(fail_digests_file), + }, + ) + await _seed_stale_digest_from_action_event( + buck, + fail_digests_file, + NONDETERMINISTIC_MIDDLE_TARGET, + "nondeterministic_middle.txt", + ) + + await buck.build( + NONDETERMINISTIC_CHAIN_CONSUMER_TARGET, + *REMOTE_ARGS, + "--materializations=none", + ) + + assert await _last_action_output_digest( + buck, + "nondeterministic_middle.txt", + ) != _single_recorded_digest(fail_digests_file) + await _assert_remote_actions_ran( + buck, + [ + "root//:nondeterministic_middle", + "root//:nondeterministic_chain_consumer", + ], + ) diff --git a/tests/core/executor/test_action_rewinding_data/.buckconfig b/tests/core/executor/test_action_rewinding_data/.buckconfig new file mode 100644 index 0000000000000..cdd5c1186d890 --- /dev/null +++ b/tests/core/executor/test_action_rewinding_data/.buckconfig @@ -0,0 +1,12 @@ +[buck2] +digest_algorithms = SHA1 +materializations = deferred + +[buildfile] +name = TARGETS.fixture + +[cells] +root = . + +[build] +execution_platforms = root//platforms:platforms diff --git a/tests/core/executor/test_action_rewinding_data/.buckroot b/tests/core/executor/test_action_rewinding_data/.buckroot new file mode 100644 index 0000000000000..8b137891791fe --- /dev/null +++ b/tests/core/executor/test_action_rewinding_data/.buckroot @@ -0,0 +1 @@ + diff --git a/tests/core/executor/test_action_rewinding_data/TARGETS.fixture b/tests/core/executor/test_action_rewinding_data/TARGETS.fixture new file mode 100644 index 0000000000000..e59d02ba57641 --- /dev/null +++ b/tests/core/executor/test_action_rewinding_data/TARGETS.fixture @@ -0,0 +1,99 @@ +load( + ":defs.bzl", + "consumer", + "consumer_two", + "many_input_targets", + "nondeterministic_middle", + "nondeterministic_producer", + "prefixed_consumer", + "producer", + "tree_dir_consumer", + "tree_dir_producer", + "tree_consumer", + "tree_producer", +) + +producer( + name = "producer", +) + +producer( + name = "producer_first", + content = "first generated input for action rewind\n", +) + +producer( + name = "producer_second", + content = "second generated input for action rewind\n", +) + +consumer( + name = "consumer", + dep = ":producer", +) + +producer( + name = "local_input_producer", + prefer_remote = True, +) + +consumer( + name = "local_input_consumer", + dep = ":local_input_producer", + local_only = True, +) + +consumer_two( + name = "consumer_two", + first = ":producer_first", + second = ":producer_second", +) + +many_input_targets(25) + +producer( + name = "top_level_output", + content = "top-level generated output for action rewind\n", +) + +tree_producer( + name = "tree_producer", +) + +tree_consumer( + name = "tree_consumer", + dep = ":tree_producer", +) + +tree_dir_producer( + name = "tree_dir_producer", +) + +tree_dir_consumer( + name = "tree_dir_consumer", + dep = ":tree_dir_producer", +) + +nondeterministic_producer( + name = "nondeterministic_producer", +) + +prefixed_consumer( + name = "nondeterministic_consumer", + dep = ":nondeterministic_producer", + out = "nondeterministic_consumer.txt", + prefix = "nondeterministic consumer saw: ", +) + +nondeterministic_middle( + name = "nondeterministic_middle", + dep = ":nondeterministic_producer", +) + +prefixed_consumer( + name = "nondeterministic_chain_consumer", + category = "consume_nondeterministic_chain", + dep = ":nondeterministic_middle", + out = "nondeterministic_chain_consumer.txt", + prefix = "nondeterministic chain consumer saw: ", +) diff --git a/tests/core/executor/test_action_rewinding_data/defs.bzl b/tests/core/executor/test_action_rewinding_data/defs.bzl new file mode 100644 index 0000000000000..04f50364f1c18 --- /dev/null +++ b/tests/core/executor/test_action_rewinding_data/defs.bzl @@ -0,0 +1,291 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# +# This source code is dual-licensed under either the MIT license found in the +# LICENSE-MIT file in the root directory of this source tree or the Apache +# License, Version 2.0 found in the LICENSE-APACHE file in the root directory +# of this source tree. You may select, at your option, one of the +# above-listed licenses. + +def _producer_impl(ctx): + out = ctx.actions.declare_output("producer.txt", has_content_based_path = False) + ctx.actions.run( + [ + "/bin/sh", + "-c", + "printf '%s' \"$1\" > \"$2\"", + "--", + ctx.attrs.content, + out.as_output(), + ], + category = "produce", + prefer_remote = ctx.attrs.prefer_remote, + ) + return [DefaultInfo(default_output = out)] + +producer = rule( + impl = _producer_impl, + attrs = { + "content": attrs.string(default = "generated input for action rewind\n"), + "prefer_remote": attrs.bool(default = False), + }, +) + +def _consumer_impl(ctx): + inp = ctx.attrs.dep[DefaultInfo].default_outputs[0] + out = ctx.actions.declare_output("consumer.txt", has_content_based_path = False) + ctx.actions.run( + [ + "/bin/sh", + "-c", + "printf 'consumer saw: ' > \"$2\"; cat \"$1\" >> \"$2\"", + "--", + inp, + out.as_output(), + ], + category = "consume", + local_only = ctx.attrs.local_only, + ) + return [DefaultInfo(default_output = out)] + +consumer = rule( + impl = _consumer_impl, + attrs = { + "dep": attrs.dep(), + "local_only": attrs.bool(default = False), + }, +) + +def _consumer_two_impl(ctx): + first = ctx.attrs.first[DefaultInfo].default_outputs[0] + second = ctx.attrs.second[DefaultInfo].default_outputs[0] + out = ctx.actions.declare_output("consumer_two.txt", has_content_based_path = False) + ctx.actions.run( + [ + "/bin/sh", + "-c", + "printf 'multi consumer saw:\n' > \"$3\"; cat \"$1\" >> \"$3\"; cat \"$2\" >> \"$3\"", + "--", + first, + second, + out.as_output(), + ], + category = "consume_two", + ) + return [DefaultInfo(default_output = out)] + +consumer_two = rule( + impl = _consumer_two_impl, + attrs = { + "first": attrs.dep(), + "second": attrs.dep(), + }, +) + +def _consumer_many_impl(ctx): + inputs = [dep[DefaultInfo].default_outputs[0] for dep in ctx.attrs.deps] + out = ctx.actions.declare_output("consumer_many.txt", has_content_based_path = False) + ctx.actions.run( + [ + "/bin/sh", + "-c", + ( + "out=\"$1\"; shift; printf 'many consumer saw:\n' > \"$out\"; " + "for input in \"$@\"; do cat \"$input\" >> \"$out\"; done" + ), + "--", + out.as_output(), + ] + inputs, + category = "consume_many", + ) + return [DefaultInfo(default_output = out)] + +consumer_many = rule( + impl = _consumer_many_impl, + attrs = { + "deps": attrs.list(attrs.dep()), + }, +) + +def many_input_targets(count): + deps = [] + for i in range(count): + name = "producer_many_{}".format(i) + producer( + name = name, + content = "many generated input {} for action rewind\n".format(i), + ) + deps.append(":{}".format(name)) + + consumer_many( + name = "consumer_many", + deps = deps, + ) + +def _tree_producer_impl(ctx): + out = ctx.actions.declare_output("tree", dir = True, has_content_based_path = False) + ctx.actions.run( + [ + "/bin/sh", + "-c", + "mkdir -p \"$2\"; printf '%s' \"$1\" > \"$2/file.txt\"", + "--", + ctx.attrs.content, + out.as_output(), + ], + category = "produce_tree", + ) + return [DefaultInfo(default_outputs = [out.project("file.txt")])] + +tree_producer = rule( + impl = _tree_producer_impl, + attrs = { + "content": attrs.string(default = "tree generated input for action rewind\n"), + }, +) + +def _tree_dir_producer_impl(ctx): + out = ctx.actions.declare_output("tree_dir", dir = True, has_content_based_path = False) + ctx.actions.run( + [ + "/bin/sh", + "-c", + "mkdir -p \"$2\"; printf '%s' \"$1\" > \"$2/file.txt\"", + "--", + ctx.attrs.content, + out.as_output(), + ], + category = "produce_tree_dir", + ) + return [DefaultInfo(default_output = out)] + +tree_dir_producer = rule( + impl = _tree_dir_producer_impl, + attrs = { + "content": attrs.string(default = "tree generated input for action rewind\n"), + }, +) + +def _tree_consumer_impl(ctx): + inp = ctx.attrs.dep[DefaultInfo].default_outputs[0] + out = ctx.actions.declare_output("tree_consumer.txt", has_content_based_path = False) + ctx.actions.run( + [ + "/bin/sh", + "-c", + "printf 'tree consumer saw: ' > \"$2\"; cat \"$1\" >> \"$2\"", + "--", + inp, + out.as_output(), + ], + category = "consume_tree", + ) + return [DefaultInfo(default_output = out)] + +tree_consumer = rule( + impl = _tree_consumer_impl, + attrs = { + "dep": attrs.dep(), + }, +) + +def _tree_dir_consumer_impl(ctx): + inp = ctx.attrs.dep[DefaultInfo].default_outputs[0] + out = ctx.actions.declare_output("tree_dir_consumer.txt", has_content_based_path = False) + ctx.actions.run( + [ + "/bin/sh", + "-c", + "printf 'tree dir consumer saw: ' > \"$2\"; cat \"$1/file.txt\" >> \"$2\"", + "--", + inp, + out.as_output(), + ], + category = "consume_tree_dir", + ) + return [DefaultInfo(default_output = out)] + +tree_dir_consumer = rule( + impl = _tree_dir_consumer_impl, + attrs = { + "dep": attrs.dep(), + }, +) + +def _nondeterministic_producer_impl(ctx): + out = ctx.actions.declare_output( + "nondeterministic.txt", + has_content_based_path = False, + ) + ctx.actions.run( + [ + "/bin/sh", + "-c", + "cat /proc/sys/kernel/random/uuid > \"$1\" 2>/dev/null || date +%s%N > \"$1\"", + "--", + out.as_output(), + ], + category = "produce_nondeterministic", + ) + return [DefaultInfo(default_output = out)] + +nondeterministic_producer = rule( + impl = _nondeterministic_producer_impl, + attrs = {}, +) + +def _nondeterministic_middle_impl(ctx): + inp = ctx.attrs.dep[DefaultInfo].default_outputs[0] + out = ctx.actions.declare_output( + "nondeterministic_middle.txt", + has_content_based_path = False, + ) + command = "cat \"$1\" > \"$2\"; " + ( + "cat /proc/sys/kernel/random/uuid >> \"$2\" 2>/dev/null || " + + "date +%s%N >> \"$2\"" + ) + ctx.actions.run( + [ + "/bin/sh", + "-c", + command, + "--", + inp, + out.as_output(), + ], + category = "middle_nondeterministic", + ) + return [DefaultInfo(default_output = out)] + +nondeterministic_middle = rule( + impl = _nondeterministic_middle_impl, + attrs = { + "dep": attrs.dep(), + }, +) + +def _prefixed_consumer_impl(ctx): + inp = ctx.attrs.dep[DefaultInfo].default_outputs[0] + out = ctx.actions.declare_output(ctx.attrs.out, has_content_based_path = False) + ctx.actions.run( + [ + "/bin/sh", + "-c", + "printf '%s' \"$1\" > \"$3\"; cat \"$2\" >> \"$3\"", + "--", + ctx.attrs.prefix, + inp, + out.as_output(), + ], + category = ctx.attrs.category, + ) + return [DefaultInfo(default_output = out)] + +prefixed_consumer = rule( + impl = _prefixed_consumer_impl, + attrs = { + "category": attrs.string(default = "consume_prefixed"), + "dep": attrs.dep(), + "out": attrs.string(), + "prefix": attrs.string(), + }, +) diff --git a/tests/core/executor/test_action_rewinding_data/platforms/TARGETS.fixture b/tests/core/executor/test_action_rewinding_data/platforms/TARGETS.fixture new file mode 100644 index 0000000000000..80533d33c2a4b --- /dev/null +++ b/tests/core/executor/test_action_rewinding_data/platforms/TARGETS.fixture @@ -0,0 +1,5 @@ +load(":defs.bzl", "execution_platforms") + +execution_platforms( + name = "platforms", +) diff --git a/tests/core/executor/test_action_rewinding_data/platforms/defs.bzl b/tests/core/executor/test_action_rewinding_data/platforms/defs.bzl new file mode 100644 index 0000000000000..733d3e23e4426 --- /dev/null +++ b/tests/core/executor/test_action_rewinding_data/platforms/defs.bzl @@ -0,0 +1,33 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# +# This source code is dual-licensed under either the MIT license found in the +# LICENSE-MIT file in the root directory of this source tree or the Apache +# License, Version 2.0 found in the LICENSE-APACHE file in the root directory +# of this source tree. You may select, at your option, one of the +# above-listed licenses. + +def _execution_platform(ctx): + platform = ExecutionPlatformInfo( + label = ctx.label.raw_target(), + configuration = ConfigurationInfo( + constraints = {}, + values = {}, + ), + executor_config = CommandExecutorConfig( + local_enabled = True, + remote_enabled = True, + remote_cache_enabled = False, + remote_execution_properties = { + "platform": "linux-remote-execution", + }, + remote_execution_use_case = "buck2-testing", + remote_execution_action_key = "executor", + ), + ) + + return [ + DefaultInfo(), + ExecutionPlatformRegistrationInfo(platforms = [platform]), + ] + +execution_platforms = rule(attrs = {}, impl = _execution_platform) diff --git a/tests/core/help/test_help_env_data/buck2-help-env-testing.golden.txt b/tests/core/help/test_help_env_data/buck2-help-env-testing.golden.txt index c820bd49e6772..430b8db0a996d 100644 --- a/tests/core/help/test_help_env_data/buck2-help-env-testing.golden.txt +++ b/tests/core/help/test_help_env_data/buck2-help-env-testing.golden.txt @@ -44,7 +44,10 @@ BUCK2_TEST_EXTRA_EXTERNAL_CONFIG String BUCK2_TEST_FAIL_BUCKD_AUTH bool false BUCK2_TEST_FAIL_CONNECT bool false BUCK2_TEST_FAIL_RE_DOWNLOADS bool false +BUCK2_TEST_FAIL_RE_DOWNLOAD_DIGESTS_ONCE Vec +BUCK2_TEST_FAIL_RE_DOWNLOAD_DIGESTS_ONCE_FILE String BUCK2_TEST_FAIL_RE_EXECUTE bool false +BUCK2_TEST_FAIL_RE_EXECUTE_MISSING_INPUTS_ONCE bool false BUCK2_TEST_FAIL_RE_RESOURCE_EXHAUSTED bool false BUCK2_TEST_FAIL_STREAMING bool false BUCK2_TEST_FAKE_SYSTEM_TOTAL_MEMORY u64 @@ -53,6 +56,8 @@ BUCK2_TEST_FORCE_DECLARE_MISMATCH bool false BUCK2_TEST_INIT_DAEMON_ERROR bool false BUCK2_TEST_INIT_DATA_SLEEP_SECS u64 BUCK2_TEST_INJECTED_MISSING_DIGESTS Vec +BUCK2_TEST_INJECTED_MISSING_DIGESTS_ONCE Vec +BUCK2_TEST_INJECTED_MISSING_DIGESTS_ONCE_FILE String BUCK2_TEST_MANIFOLD_CHUNK_BYTES u64 BUCK2_TEST_MANIFOLD_TTL_S u64 BUCK2_TEST_ONLY_REMOTE_DEP_FILE_CACHE bool false