Skip to content

Commit 6e1e32e

Browse files
buck2_execute: implement OSS upload_blob for local_only cache uploads
Forward-port of patch 4 in <#477>, providing a clear piece of missing functionality: in the event that stdout or stderr were more than 50KiB of output when caching `local_only` actions, then this dead path was taken, and so stdout/stderr would not be uploaded successfully in the cache. Co-authored-by: Austin Seipp <[email protected]>
1 parent 1612758 commit 6e1e32e

File tree

5 files changed

+34
-13
lines changed

5 files changed

+34
-13
lines changed

app/buck2_execute/src/execute/output.rs

+12-3
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,12 @@ use buck2_common::file_ops::FileDigest;
1414
use buck2_core::execution_types::executor_config::RemoteExecutorUseCase;
1515
use buck2_error::BuckErrorContext;
1616
use futures::future;
17+
use remote_execution::InlinedBlobWithDigest;
1718
use remote_execution::TDigest;
1819

1920
use crate::digest::CasDigestConversionResultExt;
2021
use crate::digest::CasDigestFromReExt;
22+
use crate::digest::CasDigestToReExt;
2123
use crate::digest_config::DigestConfig;
2224
use crate::re::manager::ManagedRemoteExecutionClient;
2325
use crate::re::streams::RemoteCommandStdStreams;
@@ -238,12 +240,13 @@ impl CommandStdStreams {
238240
self,
239241
client: &ManagedRemoteExecutionClient,
240242
use_case: RemoteExecutorUseCase,
243+
digest_config: DigestConfig,
241244
) -> buck2_error::Result<StdStreamPair<ReStdStream>> {
242245
match self {
243246
Self::Local { stdout, stderr } => {
244247
let (stdout, stderr) = future::try_join(
245-
maybe_upload_to_re(client, use_case, stdout),
246-
maybe_upload_to_re(client, use_case, stderr),
248+
maybe_upload_to_re(client, use_case, stdout, digest_config),
249+
maybe_upload_to_re(client, use_case, stderr, digest_config),
247250
)
248251
.await?;
249252

@@ -277,11 +280,17 @@ async fn maybe_upload_to_re(
277280
client: &ManagedRemoteExecutionClient,
278281
use_case: RemoteExecutorUseCase,
279282
bytes: Vec<u8>,
283+
digest_config: DigestConfig,
280284
) -> buck2_error::Result<ReStdStream> {
281285
const MIN_STREAM_UPLOAD_SIZE: usize = 50 * 1024; // Same as RE
282286
if bytes.len() < MIN_STREAM_UPLOAD_SIZE {
283287
return Ok(ReStdStream::Raw(bytes));
284288
}
285-
let digest = client.upload_blob(bytes, use_case).await?;
289+
let inline_blob = InlinedBlobWithDigest {
290+
digest: FileDigest::from_content(&bytes, digest_config.cas_digest_config()).to_re(),
291+
blob: bytes,
292+
..Default::default()
293+
};
294+
let digest = client.upload_blob(inline_blob, use_case).await?;
286295
Ok(ReStdStream::Digest(digest))
287296
}

app/buck2_execute/src/re/client.rs

+5-3
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,7 @@ impl RemoteExecutionClient {
297297

298298
pub async fn upload_blob(
299299
&self,
300-
blob: Vec<u8>,
300+
blob: InlinedBlobWithDigest,
301301
use_case: RemoteExecutorUseCase,
302302
) -> buck2_error::Result<TDigest> {
303303
self.data
@@ -1209,17 +1209,19 @@ impl RemoteExecutionClientImpl {
12091209

12101210
pub async fn upload_blob(
12111211
&self,
1212-
blob: Vec<u8>,
1212+
blob: InlinedBlobWithDigest,
12131213
use_case: RemoteExecutorUseCase,
12141214
) -> buck2_error::Result<TDigest> {
1215+
let digest = blob.digest.clone();
12151216
with_error_handler(
12161217
"upload_blob",
12171218
self.get_session_id(),
12181219
self.client()
12191220
.upload_blob(blob, use_case.metadata(None))
12201221
.await,
12211222
)
1222-
.await
1223+
.await?;
1224+
Ok(digest)
12231225
}
12241226

12251227
async fn materialize_files(

app/buck2_execute/src/re/manager.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -468,7 +468,7 @@ impl ManagedRemoteExecutionClient {
468468

469469
pub async fn upload_blob(
470470
&self,
471-
blob: Vec<u8>,
471+
blob: InlinedBlobWithDigest,
472472
use_case: RemoteExecutorUseCase,
473473
) -> buck2_error::Result<TDigest> {
474474
let use_case = self.re_use_case_override.unwrap_or(use_case);

app/buck2_execute_impl/src/executors/caching.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -417,7 +417,7 @@ impl CacheUploader {
417417
.report
418418
.std_streams
419419
.clone()
420-
.into_re(&self.re_client, self.re_use_case)
420+
.into_re(&self.re_client, self.re_use_case, digest_config)
421421
.await
422422
.buck_error_context("Error accessing std_streams")
423423
};

remote_execution/oss/re_grpc/src/client.rs

+15-5
Original file line numberDiff line numberDiff line change
@@ -762,11 +762,21 @@ impl REClient {
762762

763763
pub async fn upload_blob(
764764
&self,
765-
_blob: Vec<u8>,
766-
_metadata: RemoteExecutionMetadata,
767-
) -> anyhow::Result<TDigest> {
768-
// TODO(aloiscochard)
769-
Err(anyhow::anyhow!("Not implemented (RE upload_blob)"))
765+
blob: InlinedBlobWithDigest,
766+
metadata: RemoteExecutionMetadata,
767+
) -> anyhow::Result<()> {
768+
self.upload(
769+
metadata,
770+
UploadRequest {
771+
inlined_blobs_with_digest: Some(vec![blob]),
772+
files_with_digest: None,
773+
directories: None,
774+
upload_only_missing: false,
775+
..Default::default()
776+
},
777+
)
778+
.await?;
779+
Ok(())
770780
}
771781

772782
pub async fn download(

0 commit comments

Comments
 (0)