Skip to content

Commit f70454a

Browse files
buck2_execute: implement OSS upload_blob for local_only cache uploads
Forward-port of patch 4 in <#477>, providing a clear piece of missing functionality: in the event that stdout or stderr were more than 50KiB of output when caching `local_only` actions, then this dead path was taken, and so stdout/stderr would not be uploaded successfully in the cache. Co-authored-by: Austin Seipp <[email protected]>
1 parent 5984aae commit f70454a

File tree

5 files changed

+34
-13
lines changed

5 files changed

+34
-13
lines changed

app/buck2_execute/src/execute/output.rs

+12-3
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,12 @@ use anyhow::Context;
1414
use buck2_common::file_ops::FileDigest;
1515
use buck2_core::execution_types::executor_config::RemoteExecutorUseCase;
1616
use futures::future;
17+
use remote_execution::InlinedBlobWithDigest;
1718
use remote_execution::TDigest;
1819

1920
use crate::digest::CasDigestConversionResultExt;
2021
use crate::digest::CasDigestFromReExt;
22+
use crate::digest::CasDigestToReExt;
2123
use crate::digest_config::DigestConfig;
2224
use crate::re::manager::ManagedRemoteExecutionClient;
2325
use crate::re::streams::RemoteCommandStdStreams;
@@ -238,12 +240,13 @@ impl CommandStdStreams {
238240
self,
239241
client: &ManagedRemoteExecutionClient,
240242
use_case: RemoteExecutorUseCase,
243+
digest_config: DigestConfig,
241244
) -> anyhow::Result<StdStreamPair<ReStdStream>> {
242245
match self {
243246
Self::Local { stdout, stderr } => {
244247
let (stdout, stderr) = future::try_join(
245-
maybe_upload_to_re(client, use_case, stdout),
246-
maybe_upload_to_re(client, use_case, stderr),
248+
maybe_upload_to_re(client, use_case, stdout, digest_config),
249+
maybe_upload_to_re(client, use_case, stderr, digest_config),
247250
)
248251
.await?;
249252

@@ -276,11 +279,17 @@ async fn maybe_upload_to_re(
276279
client: &ManagedRemoteExecutionClient,
277280
use_case: RemoteExecutorUseCase,
278281
bytes: Vec<u8>,
282+
digest_config: DigestConfig,
279283
) -> anyhow::Result<ReStdStream> {
280284
const MIN_STREAM_UPLOAD_SIZE: usize = 50 * 1024; // Same as RE
281285
if bytes.len() < MIN_STREAM_UPLOAD_SIZE {
282286
return Ok(ReStdStream::Raw(bytes));
283287
}
284-
let digest = client.upload_blob(bytes, use_case).await?;
288+
let inline_blob = InlinedBlobWithDigest {
289+
digest: FileDigest::from_content(&bytes, digest_config.cas_digest_config()).to_re(),
290+
blob: bytes,
291+
..Default::default()
292+
};
293+
let digest = client.upload_blob(inline_blob, use_case).await?;
285294
Ok(ReStdStream::Digest(digest))
286295
}

app/buck2_execute/src/re/client.rs

+5-3
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ impl RemoteExecutionClient {
318318

319319
pub async fn upload_blob(
320320
&self,
321-
blob: Vec<u8>,
321+
blob: InlinedBlobWithDigest,
322322
use_case: RemoteExecutorUseCase,
323323
) -> anyhow::Result<TDigest> {
324324
self.data
@@ -1151,17 +1151,19 @@ impl RemoteExecutionClientImpl {
11511151

11521152
pub async fn upload_blob(
11531153
&self,
1154-
blob: Vec<u8>,
1154+
blob: InlinedBlobWithDigest,
11551155
use_case: RemoteExecutorUseCase,
11561156
) -> anyhow::Result<TDigest> {
1157+
let digest = blob.digest.clone();
11571158
with_error_handler(
11581159
"upload_blob",
11591160
self.get_session_id(),
11601161
self.client()
11611162
.upload_blob(blob, use_case.metadata(None))
11621163
.await,
11631164
)
1164-
.await
1165+
.await?;
1166+
Ok(digest)
11651167
}
11661168

11671169
async fn materialize_files(

app/buck2_execute/src/re/manager.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -460,7 +460,7 @@ impl ManagedRemoteExecutionClient {
460460

461461
pub async fn upload_blob(
462462
&self,
463-
blob: Vec<u8>,
463+
blob: InlinedBlobWithDigest,
464464
use_case: RemoteExecutorUseCase,
465465
) -> anyhow::Result<TDigest> {
466466
self.lock()?.get().await?.upload_blob(blob, use_case).await

app/buck2_execute_impl/src/executors/caching.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -416,7 +416,7 @@ impl CacheUploader {
416416
.report
417417
.std_streams
418418
.clone()
419-
.into_re(&self.re_client, self.re_use_case)
419+
.into_re(&self.re_client, self.re_use_case, digest_config)
420420
.await
421421
.context("Error accessing std_streams")
422422
};

remote_execution/oss/re_grpc/src/client.rs

+15-5
Original file line numberDiff line numberDiff line change
@@ -758,11 +758,21 @@ impl REClient {
758758

759759
pub async fn upload_blob(
760760
&self,
761-
_blob: Vec<u8>,
762-
_metadata: RemoteExecutionMetadata,
763-
) -> anyhow::Result<TDigest> {
764-
// TODO(aloiscochard)
765-
Err(anyhow::anyhow!("Not implemented (RE upload_blob)"))
761+
blob: InlinedBlobWithDigest,
762+
metadata: RemoteExecutionMetadata,
763+
) -> anyhow::Result<()> {
764+
self.upload(
765+
metadata,
766+
UploadRequest {
767+
inlined_blobs_with_digest: Some(vec![blob]),
768+
files_with_digest: None,
769+
directories: None,
770+
upload_only_missing: false,
771+
..Default::default()
772+
},
773+
)
774+
.await?;
775+
Ok(())
766776
}
767777

768778
pub async fn download(

0 commit comments

Comments
 (0)