Skip to content

Commit

Permalink
buck2_execute: implement OSS upload_blob for local_only cache upl…
Browse files Browse the repository at this point in the history
…oads (#765)

Summary:
Forward-port of patch 4 in <#477>, providing a clear piece of missing functionality: in the event that stdout or stderr were more than 50KiB of output when caching `local_only` actions, then this dead path was taken, and so stdout/stderr would not be uploaded successfully in the cache.

Pull Request resolved: #765

Reviewed By: ndmitchell, rajneesh

Differential Revision: D66988951

Pulled By: IanChilds

fbshipit-source-id: a9b53eacb16af3ed50df41405842138dcb2d3f83
  • Loading branch information
cormacrelf authored and facebook-github-bot committed Feb 7, 2025
1 parent 53f4592 commit c5c807a
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 13 deletions.
15 changes: 12 additions & 3 deletions app/buck2_execute/src/execute/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ use buck2_common::file_ops::FileDigest;
use buck2_core::execution_types::executor_config::RemoteExecutorUseCase;
use buck2_error::BuckErrorContext;
use futures::future;
use remote_execution::InlinedBlobWithDigest;
use remote_execution::TDigest;

use crate::digest::CasDigestConversionResultExt;
use crate::digest::CasDigestFromReExt;
use crate::digest::CasDigestToReExt;
use crate::digest_config::DigestConfig;
use crate::re::manager::ManagedRemoteExecutionClient;
use crate::re::streams::RemoteCommandStdStreams;
Expand Down Expand Up @@ -238,12 +240,13 @@ impl CommandStdStreams {
self,
client: &ManagedRemoteExecutionClient,
use_case: RemoteExecutorUseCase,
digest_config: DigestConfig,
) -> buck2_error::Result<StdStreamPair<ReStdStream>> {
match self {
Self::Local { stdout, stderr } => {
let (stdout, stderr) = future::try_join(
maybe_upload_to_re(client, use_case, stdout),
maybe_upload_to_re(client, use_case, stderr),
maybe_upload_to_re(client, use_case, stdout, digest_config),
maybe_upload_to_re(client, use_case, stderr, digest_config),
)
.await?;

Expand Down Expand Up @@ -277,11 +280,17 @@ async fn maybe_upload_to_re(
client: &ManagedRemoteExecutionClient,
use_case: RemoteExecutorUseCase,
bytes: Vec<u8>,
digest_config: DigestConfig,
) -> buck2_error::Result<ReStdStream> {
const MIN_STREAM_UPLOAD_SIZE: usize = 50 * 1024; // Same as RE
if bytes.len() < MIN_STREAM_UPLOAD_SIZE {
return Ok(ReStdStream::Raw(bytes));
}
let digest = client.upload_blob(bytes, use_case).await?;
let inline_blob = InlinedBlobWithDigest {
digest: FileDigest::from_content(&bytes, digest_config.cas_digest_config()).to_re(),
blob: bytes,
..Default::default()
};
let digest = client.upload_blob(inline_blob, use_case).await?;
Ok(ReStdStream::Digest(digest))
}
6 changes: 3 additions & 3 deletions app/buck2_execute/src/re/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ impl RemoteExecutionClient {

pub async fn upload_blob(
&self,
blob: Vec<u8>,
blob: InlinedBlobWithDigest,
use_case: RemoteExecutorUseCase,
) -> buck2_error::Result<TDigest> {
self.data
Expand Down Expand Up @@ -1228,14 +1228,14 @@ impl RemoteExecutionClientImpl {

pub async fn upload_blob(
&self,
blob: Vec<u8>,
blob: InlinedBlobWithDigest,
use_case: RemoteExecutorUseCase,
) -> buck2_error::Result<TDigest> {
with_error_handler(
"upload_blob",
self.get_session_id(),
self.client()
.upload_blob(blob, use_case.metadata(None))
.upload_blob_with_digest(blob.blob, blob.digest, use_case.metadata(None))
.await,
)
.await
Expand Down
2 changes: 1 addition & 1 deletion app/buck2_execute/src/re/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ impl ManagedRemoteExecutionClient {

pub async fn upload_blob(
&self,
blob: Vec<u8>,
blob: InlinedBlobWithDigest,
use_case: RemoteExecutorUseCase,
) -> buck2_error::Result<TDigest> {
let use_case = self.re_use_case_override.unwrap_or(use_case);
Expand Down
2 changes: 1 addition & 1 deletion app/buck2_execute_impl/src/executors/caching.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ impl CacheUploader {
.report
.std_streams
.clone()
.into_re(&self.re_client, self.re_use_case)
.into_re(&self.re_client, self.re_use_case, digest_config)
.await
.buck_error_context("Error accessing std_streams")
};
Expand Down
26 changes: 21 additions & 5 deletions remote_execution/oss/re_grpc/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -760,13 +760,29 @@ impl REClient {
.await
}

pub async fn upload_blob(
pub async fn upload_blob_with_digest(
&self,
_blob: Vec<u8>,
_metadata: RemoteExecutionMetadata,
blob: Vec<u8>,
digest: TDigest,
metadata: RemoteExecutionMetadata,
) -> anyhow::Result<TDigest> {
// TODO(aloiscochard)
Err(anyhow::anyhow!("Not implemented (RE upload_blob)"))
let blob = InlinedBlobWithDigest {
digest: digest.clone(),
blob,
..Default::default()
};
self.upload(
metadata,
UploadRequest {
inlined_blobs_with_digest: Some(vec![blob]),
files_with_digest: None,
directories: None,
upload_only_missing: false,
..Default::default()
},
)
.await?;
Ok(digest)
}

pub async fn download(
Expand Down

0 comments on commit c5c807a

Please sign in to comment.