Skip to content

Commit 52dac90

Browse files
buck2_execute: implement OSS upload_blob for local_only cache uploads
Forward-port of patch 4 in <#477>, providing a clear piece of missing functionality: in the event that stdout or stderr were more than 50KiB of output when caching `local_only` actions, then this dead path was taken, and so stdout/stderr would not be uploaded successfully in the cache. Co-authored-by: Austin Seipp <[email protected]>
1 parent b31218f commit 52dac90

File tree

5 files changed

+34
-13
lines changed

5 files changed

+34
-13
lines changed

app/buck2_execute/src/execute/output.rs

+12-3
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,12 @@ use anyhow::Context;
1414
use buck2_common::file_ops::FileDigest;
1515
use buck2_core::execution_types::executor_config::RemoteExecutorUseCase;
1616
use futures::future;
17+
use remote_execution::InlinedBlobWithDigest;
1718
use remote_execution::TDigest;
1819

1920
use crate::digest::CasDigestConversionResultExt;
2021
use crate::digest::CasDigestFromReExt;
22+
use crate::digest::CasDigestToReExt;
2123
use crate::digest_config::DigestConfig;
2224
use crate::re::manager::ManagedRemoteExecutionClient;
2325
use crate::re::streams::RemoteCommandStdStreams;
@@ -238,12 +240,13 @@ impl CommandStdStreams {
238240
self,
239241
client: &ManagedRemoteExecutionClient,
240242
use_case: RemoteExecutorUseCase,
243+
digest_config: DigestConfig,
241244
) -> anyhow::Result<StdStreamPair<ReStdStream>> {
242245
match self {
243246
Self::Local { stdout, stderr } => {
244247
let (stdout, stderr) = future::try_join(
245-
maybe_upload_to_re(client, use_case, stdout),
246-
maybe_upload_to_re(client, use_case, stderr),
248+
maybe_upload_to_re(client, use_case, stdout, digest_config),
249+
maybe_upload_to_re(client, use_case, stderr, digest_config),
247250
)
248251
.await?;
249252

@@ -276,11 +279,17 @@ async fn maybe_upload_to_re(
276279
client: &ManagedRemoteExecutionClient,
277280
use_case: RemoteExecutorUseCase,
278281
bytes: Vec<u8>,
282+
digest_config: DigestConfig,
279283
) -> anyhow::Result<ReStdStream> {
280284
const MIN_STREAM_UPLOAD_SIZE: usize = 50 * 1024; // Same as RE
281285
if bytes.len() < MIN_STREAM_UPLOAD_SIZE {
282286
return Ok(ReStdStream::Raw(bytes));
283287
}
284-
let digest = client.upload_blob(bytes, use_case).await?;
288+
let inline_blob = InlinedBlobWithDigest {
289+
digest: FileDigest::from_content(&bytes, digest_config.cas_digest_config()).to_re(),
290+
blob: bytes,
291+
..Default::default()
292+
};
293+
let digest = client.upload_blob(inline_blob, use_case).await?;
285294
Ok(ReStdStream::Digest(digest))
286295
}

app/buck2_execute/src/re/client.rs

+5-3
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ impl RemoteExecutionClient {
277277

278278
pub async fn upload_blob(
279279
&self,
280-
blob: Vec<u8>,
280+
blob: InlinedBlobWithDigest,
281281
use_case: RemoteExecutorUseCase,
282282
) -> anyhow::Result<TDigest> {
283283
self.data
@@ -1138,17 +1138,19 @@ impl RemoteExecutionClientImpl {
11381138

11391139
pub async fn upload_blob(
11401140
&self,
1141-
blob: Vec<u8>,
1141+
blob: InlinedBlobWithDigest,
11421142
use_case: RemoteExecutorUseCase,
11431143
) -> anyhow::Result<TDigest> {
1144+
let digest = blob.digest.clone();
11441145
with_error_handler(
11451146
"upload_blob",
11461147
self.get_session_id(),
11471148
self.client()
11481149
.upload_blob(blob, use_case.metadata(None))
11491150
.await,
11501151
)
1151-
.await
1152+
.await?;
1153+
Ok(digest)
11521154
}
11531155

11541156
async fn materialize_files(

app/buck2_execute/src/re/manager.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -451,7 +451,7 @@ impl ManagedRemoteExecutionClient {
451451

452452
pub async fn upload_blob(
453453
&self,
454-
blob: Vec<u8>,
454+
blob: InlinedBlobWithDigest,
455455
use_case: RemoteExecutorUseCase,
456456
) -> anyhow::Result<TDigest> {
457457
self.lock()?.get().await?.upload_blob(blob, use_case).await

app/buck2_execute_impl/src/executors/caching.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -417,7 +417,7 @@ impl CacheUploader {
417417
.report
418418
.std_streams
419419
.clone()
420-
.into_re(&self.re_client, self.re_use_case)
420+
.into_re(&self.re_client, self.re_use_case, digest_config)
421421
.await
422422
.context("Error accessing std_streams")
423423
};

remote_execution/oss/re_grpc/src/client.rs

+15-5
Original file line numberDiff line numberDiff line change
@@ -760,11 +760,21 @@ impl REClient {
760760

761761
pub async fn upload_blob(
762762
&self,
763-
_blob: Vec<u8>,
764-
_metadata: RemoteExecutionMetadata,
765-
) -> anyhow::Result<TDigest> {
766-
// TODO(aloiscochard)
767-
Err(anyhow::anyhow!("Not implemented (RE upload_blob)"))
763+
blob: InlinedBlobWithDigest,
764+
metadata: RemoteExecutionMetadata,
765+
) -> anyhow::Result<()> {
766+
self.upload(
767+
metadata,
768+
UploadRequest {
769+
inlined_blobs_with_digest: Some(vec![blob]),
770+
files_with_digest: None,
771+
directories: None,
772+
upload_only_missing: false,
773+
..Default::default()
774+
},
775+
)
776+
.await?;
777+
Ok(())
768778
}
769779

770780
pub async fn download(

0 commit comments

Comments
 (0)