-
Notifications
You must be signed in to change notification settings - Fork 240
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement missing parts of OSS RE caching #477
base: main
Are you sure you want to change the base?
Changes from all commits
3c96e77
966ded2
ca99697
4fc77c1
00dafa8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
load(":defs.bzl", "tests") | ||
|
||
tests(name = "tests") |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
# Copyright (c) Meta Platforms, Inc. and affiliates. | ||
# | ||
# This source code is licensed under both the MIT license found in the | ||
# LICENSE-MIT file in the root directory of this source tree and the Apache | ||
# License, Version 2.0 found in the LICENSE-APACHE file in the root directory | ||
# of this source tree. | ||
|
||
def _tests(ctx): | ||
# Create a large stdout stream locally, and upload it to CAS. | ||
# The limit for inline stdout is 50KiB. So this will force calling client.upload_blob. | ||
stage0 = ctx.actions.declare_output("stage0") | ||
ctx.actions.run( | ||
["sh", "-c", 'yes abcdefghijklmnopqrstuvwxyz | head -c 65536 && echo done > "$1"', "--", stage0.as_output()], | ||
category = "stage0", | ||
local_only = True, | ||
allow_cache_upload = True, | ||
) | ||
|
||
return [DefaultInfo(stage0)] | ||
|
||
tests = rule(attrs = {}, impl = _tests) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -41,11 +41,15 @@ use re_grpc_proto::build::bazel::remote::execution::v2::Digest; | |
use re_grpc_proto::build::bazel::remote::execution::v2::ExecuteOperationMetadata; | ||
use re_grpc_proto::build::bazel::remote::execution::v2::ExecuteRequest as GExecuteRequest; | ||
use re_grpc_proto::build::bazel::remote::execution::v2::ExecuteResponse as GExecuteResponse; | ||
use re_grpc_proto::build::bazel::remote::execution::v2::ExecutedActionMetadata; | ||
use re_grpc_proto::build::bazel::remote::execution::v2::FindMissingBlobsRequest; | ||
use re_grpc_proto::build::bazel::remote::execution::v2::FindMissingBlobsResponse; | ||
use re_grpc_proto::build::bazel::remote::execution::v2::GetActionResultRequest; | ||
use re_grpc_proto::build::bazel::remote::execution::v2::GetCapabilitiesRequest; | ||
use re_grpc_proto::build::bazel::remote::execution::v2::OutputDirectory; | ||
use re_grpc_proto::build::bazel::remote::execution::v2::OutputFile; | ||
use re_grpc_proto::build::bazel::remote::execution::v2::ResultsCachePolicy; | ||
use re_grpc_proto::build::bazel::remote::execution::v2::UpdateActionResultRequest; | ||
use re_grpc_proto::google::bytestream::byte_stream_client::ByteStreamClient; | ||
use re_grpc_proto::google::bytestream::ReadRequest; | ||
use re_grpc_proto::google::bytestream::ReadResponse; | ||
|
@@ -115,6 +119,13 @@ fn check_status(status: Status) -> Result<(), REClientError> { | |
}) | ||
} | ||
|
||
fn ttimestamp_to(ts: TTimestamp) -> Option<prost_types::Timestamp> { | ||
Some(prost_types::Timestamp { | ||
seconds: ts.seconds, | ||
nanos: ts.nanos, | ||
}) | ||
} | ||
|
||
fn ttimestamp_from(ts: Option<::prost_types::Timestamp>) -> TTimestamp { | ||
match ts { | ||
Some(timestamp) => TTimestamp { | ||
|
@@ -499,10 +510,33 @@ impl REClient { | |
|
||
pub async fn write_action_result( | ||
&self, | ||
_metadata: RemoteExecutionMetadata, | ||
_request: WriteActionResultRequest, | ||
metadata: RemoteExecutionMetadata, | ||
write_request: WriteActionResultRequest, | ||
) -> anyhow::Result<WriteActionResultResponse> { | ||
Err(anyhow::anyhow!("Not supported")) | ||
let mut client = self.grpc_clients.action_cache_client.clone(); | ||
let action_digest = tdigest_to(write_request.action_digest.clone()); | ||
let action_result = convert_taction_result_to_rbe(write_request.action_result)?; | ||
let request = UpdateActionResultRequest { | ||
action_digest: Some(action_digest), | ||
action_result: Some(action_result), | ||
results_cache_policy: None, | ||
instance_name: self.instance_name.as_str().to_owned(), | ||
}; | ||
|
||
let t: ActionResult = client | ||
.update_action_result(with_internal_metadata(request, metadata)) | ||
.await? | ||
.into_inner(); | ||
|
||
let result = convert_action_result(t)?; | ||
let result = WriteActionResultResponse { | ||
actual_action_result: result, | ||
// NOTE: This is an arbitrary number because RBE does not return information | ||
// on the TTL of the ActionResult. | ||
// Also buck2 does not appear to read this value anywhere. | ||
ttl_seconds: 0, | ||
}; | ||
Ok(result) | ||
} | ||
|
||
pub async fn execute_with_progress( | ||
|
@@ -655,11 +689,21 @@ impl REClient { | |
|
||
pub async fn upload_blob( | ||
&self, | ||
_blob: Vec<u8>, | ||
_metadata: RemoteExecutionMetadata, | ||
) -> anyhow::Result<TDigest> { | ||
// TODO(aloiscochard) | ||
Err(anyhow::anyhow!("Not implemented (RE upload_blob)")) | ||
blob: InlinedBlobWithDigest, | ||
metadata: RemoteExecutionMetadata, | ||
) -> anyhow::Result<()> { | ||
self.upload( | ||
metadata, | ||
UploadRequest { | ||
inlined_blobs_with_digest: Some(vec![blob]), | ||
files_with_digest: None, | ||
directories: None, | ||
upload_only_missing: false, | ||
..Default::default() | ||
}, | ||
) | ||
.await?; | ||
Ok(()) | ||
} | ||
|
||
pub async fn download( | ||
|
@@ -771,7 +815,118 @@ impl REClient { | |
} | ||
} | ||
|
||
fn convert_execution_action_metadata_to_rbe( | ||
execution_metadata: TExecutedActionMetadata, | ||
) -> anyhow::Result<ExecutedActionMetadata> { | ||
let TExecutedActionMetadata { | ||
worker, | ||
queued_timestamp, | ||
worker_start_timestamp, | ||
worker_completed_timestamp, | ||
input_fetch_start_timestamp, | ||
input_fetch_completed_timestamp, | ||
execution_start_timestamp, | ||
execution_completed_timestamp, | ||
output_upload_start_timestamp, | ||
output_upload_completed_timestamp, | ||
execution_dir: _, | ||
input_analyzing_start_timestamp: _, | ||
input_analyzing_completed_timestamp: _, | ||
execution_attempts: _, | ||
last_queued_timestamp: _, | ||
instruction_counts: _, | ||
auxiliary_metadata: _, | ||
_dot_dot_default, | ||
} = execution_metadata; | ||
Ok(ExecutedActionMetadata { | ||
worker, | ||
worker_start_timestamp: ttimestamp_to(worker_start_timestamp), | ||
worker_completed_timestamp: ttimestamp_to(worker_completed_timestamp), | ||
input_fetch_start_timestamp: ttimestamp_to(input_fetch_start_timestamp), | ||
input_fetch_completed_timestamp: ttimestamp_to(input_fetch_completed_timestamp), | ||
execution_start_timestamp: ttimestamp_to(execution_start_timestamp), | ||
execution_completed_timestamp: ttimestamp_to(execution_completed_timestamp), | ||
output_upload_start_timestamp: ttimestamp_to(output_upload_start_timestamp), | ||
output_upload_completed_timestamp: ttimestamp_to(output_upload_completed_timestamp), | ||
queued_timestamp: ttimestamp_to(queued_timestamp), | ||
// TODO(cormacrelf): calculate this in a reasonable way for buck. | ||
// see protobuf docs on virtual_execution_duration. | ||
// May be able to use last_queued_timestamp | ||
virtual_execution_duration: None, | ||
// Ugh, need a routine to convert TAny to prost_type::Any... | ||
auxiliary_metadata: vec![], | ||
}) | ||
} | ||
|
||
fn convert_taction_result_to_rbe(taction_result: TActionResult2) -> anyhow::Result<ActionResult> { | ||
let TActionResult2 { | ||
output_files, | ||
output_directories, | ||
exit_code, | ||
stdout_raw, | ||
stdout_digest, | ||
stderr_raw, | ||
stderr_digest, | ||
execution_metadata, | ||
auxiliary_metadata: _, | ||
_dot_dot_default, | ||
} = taction_result; | ||
|
||
let execution_metadata = convert_execution_action_metadata_to_rbe(execution_metadata)?; | ||
let output_files = output_files.into_try_map(|output_file| { | ||
let TFile { | ||
digest, | ||
name, | ||
executable, | ||
.. | ||
} = output_file; | ||
anyhow::Ok(OutputFile { | ||
digest: Some(tdigest_to(digest.digest)), | ||
path: name, | ||
is_executable: executable, | ||
// Clients SHOULD NOT populate this field when uploading to the cache. | ||
contents: Vec::new(), | ||
node_properties: None, | ||
}) | ||
})?; | ||
let output_directories = output_directories.into_try_map(|output_directory| { | ||
let tree_digest = tdigest_to(output_directory.tree_digest); | ||
anyhow::Ok(OutputDirectory { | ||
path: output_directory.path, | ||
tree_digest: Some(tree_digest.clone()), | ||
// TODO(cormacrelf): check whether buck2_execute::directory::directory_to_re_tree | ||
// conforms with the requirements of passing `true` here (see .proto file) | ||
is_topologically_sorted: false, | ||
}) | ||
})?; | ||
anyhow::Ok(ActionResult { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any reason to not just There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's just splatter from copy pasting the function below and inverting every operation, instead of coding it from scratch. I'll tidy this up, a few of these don't even need to be Result. |
||
exit_code, | ||
execution_metadata: Some(execution_metadata), | ||
output_directories, | ||
output_files, | ||
// TODO: support symlinks | ||
output_symlinks: vec![], | ||
output_file_symlinks: vec![], | ||
output_directory_symlinks: vec![], | ||
// If missing, it's because we uploaded it already | ||
// if present, it's inline | ||
stdout_raw: stdout_raw.unwrap_or(Vec::new()), | ||
stdout_digest: stdout_digest.map(tdigest_to), | ||
stderr_raw: stderr_raw.unwrap_or(Vec::new()), | ||
stderr_digest: stderr_digest.map(tdigest_to), | ||
}) | ||
} | ||
|
||
fn convert_action_result(action_result: ActionResult) -> anyhow::Result<TActionResult2> { | ||
if !action_result.output_symlinks.is_empty() | ||
|| !action_result.output_file_symlinks.is_empty() | ||
|| !action_result.output_directory_symlinks.is_empty() | ||
{ | ||
anyhow::bail!( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We tend to avoid bail because it can be hard to reason about - easier to see the explicit return |
||
"CAS ActionResult returned with symlinks in it, buck2 cannot handle these yet" | ||
); | ||
} | ||
|
||
let execution_metadata = action_result | ||
.execution_metadata | ||
.with_context(|| "The execution metadata are not defined.")?; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -91,6 +91,9 @@ pub struct TSubsysPerfCount { | |
pub struct TActionResult2 { | ||
pub output_files: Vec<TFile>, | ||
pub output_directories: Vec<TDirectory2>, | ||
// TODO: output_symlinks (use in preference when output_paths mode is used the execution side) | ||
// TODO: output_file_symlinks (deprecated) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why mark this as a todo? Is it something we should add? Or is this just for when servers return deprecated stuff? |
||
// TODO: output_directory_symlinks (deprecated) | ||
pub exit_code: i32, | ||
pub stdout_raw: Option<Vec<u8>>, | ||
pub stdout_digest: Option<TDigest>, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this not appear in https://github.com/facebook/buck2/blob/main/app/buck2_execute_impl/src/executors/caching.rs#L108C25-L108C25 as a warning? It certainly should
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That line may have run, but I definitely did not see it in the terminal. Thanks for the pointer, I will have a look at why.