diff --git a/app/buck2_execute/src/execute/output.rs b/app/buck2_execute/src/execute/output.rs
index c47319a204d85..420054d02f8c9 100644
--- a/app/buck2_execute/src/execute/output.rs
+++ b/app/buck2_execute/src/execute/output.rs
@@ -14,10 +14,12 @@ use buck2_common::file_ops::FileDigest;
use buck2_core::execution_types::executor_config::RemoteExecutorUseCase;
use buck2_error::BuckErrorContext;
use futures::future;
+use remote_execution::InlinedBlobWithDigest;
use remote_execution::TDigest;
use crate::digest::CasDigestConversionResultExt;
use crate::digest::CasDigestFromReExt;
+use crate::digest::CasDigestToReExt;
use crate::digest_config::DigestConfig;
use crate::re::manager::ManagedRemoteExecutionClient;
use crate::re::streams::RemoteCommandStdStreams;
@@ -238,12 +240,13 @@ impl CommandStdStreams {
self,
client: &ManagedRemoteExecutionClient,
use_case: RemoteExecutorUseCase,
+ digest_config: DigestConfig,
) -> buck2_error::Result> {
match self {
Self::Local { stdout, stderr } => {
let (stdout, stderr) = future::try_join(
- maybe_upload_to_re(client, use_case, stdout),
- maybe_upload_to_re(client, use_case, stderr),
+ maybe_upload_to_re(client, use_case, stdout, digest_config),
+ maybe_upload_to_re(client, use_case, stderr, digest_config),
)
.await?;
@@ -277,11 +280,17 @@ async fn maybe_upload_to_re(
client: &ManagedRemoteExecutionClient,
use_case: RemoteExecutorUseCase,
bytes: Vec,
+ digest_config: DigestConfig,
) -> buck2_error::Result {
const MIN_STREAM_UPLOAD_SIZE: usize = 50 * 1024; // Same as RE
if bytes.len() < MIN_STREAM_UPLOAD_SIZE {
return Ok(ReStdStream::Raw(bytes));
}
- let digest = client.upload_blob(bytes, use_case).await?;
+ let inline_blob = InlinedBlobWithDigest {
+ digest: FileDigest::from_content(&bytes, digest_config.cas_digest_config()).to_re(),
+ blob: bytes,
+ ..Default::default()
+ };
+ let digest = client.upload_blob(inline_blob, use_case).await?;
Ok(ReStdStream::Digest(digest))
}
diff --git a/app/buck2_execute/src/re/client.rs b/app/buck2_execute/src/re/client.rs
index 865acc3d2d4ac..e054b17bcf04e 100644
--- a/app/buck2_execute/src/re/client.rs
+++ b/app/buck2_execute/src/re/client.rs
@@ -303,7 +303,7 @@ impl RemoteExecutionClient {
pub async fn upload_blob(
&self,
- blob: Vec,
+ blob: InlinedBlobWithDigest,
use_case: RemoteExecutorUseCase,
) -> buck2_error::Result {
self.data
@@ -1228,17 +1228,19 @@ impl RemoteExecutionClientImpl {
pub async fn upload_blob(
&self,
- blob: Vec,
+ blob: InlinedBlobWithDigest,
use_case: RemoteExecutorUseCase,
) -> buck2_error::Result {
+ let digest = blob.digest.clone();
with_error_handler(
"upload_blob",
self.get_session_id(),
self.client()
- .upload_blob(blob, use_case.metadata(None))
+ .upload_blob_with_digest(blob.blob, blob.digest, use_case.metadata(None))
.await,
)
- .await
+ .await?;
+ Ok(digest)
}
async fn materialize_files(
diff --git a/app/buck2_execute/src/re/manager.rs b/app/buck2_execute/src/re/manager.rs
index 5811e63537880..5d28299da5b6f 100644
--- a/app/buck2_execute/src/re/manager.rs
+++ b/app/buck2_execute/src/re/manager.rs
@@ -471,7 +471,7 @@ impl ManagedRemoteExecutionClient {
pub async fn upload_blob(
&self,
- blob: Vec,
+ blob: InlinedBlobWithDigest,
use_case: RemoteExecutorUseCase,
) -> buck2_error::Result {
let use_case = self.re_use_case_override.unwrap_or(use_case);
diff --git a/app/buck2_execute_impl/src/executors/caching.rs b/app/buck2_execute_impl/src/executors/caching.rs
index 85b753788d54b..3aab531703c28 100644
--- a/app/buck2_execute_impl/src/executors/caching.rs
+++ b/app/buck2_execute_impl/src/executors/caching.rs
@@ -417,7 +417,7 @@ impl CacheUploader {
.report
.std_streams
.clone()
- .into_re(&self.re_client, self.re_use_case)
+ .into_re(&self.re_client, self.re_use_case, digest_config)
.await
.buck_error_context("Error accessing std_streams")
};
diff --git a/remote_execution/oss/re_grpc/src/client.rs b/remote_execution/oss/re_grpc/src/client.rs
index f1d860e624130..c1a210ed8bec4 100644
--- a/remote_execution/oss/re_grpc/src/client.rs
+++ b/remote_execution/oss/re_grpc/src/client.rs
@@ -760,13 +760,25 @@ impl REClient {
.await
}
- pub async fn upload_blob(
+ pub async fn upload_blob_with_digest(
&self,
- _blob: Vec,
- _metadata: RemoteExecutionMetadata,
- ) -> anyhow::Result {
- // TODO(aloiscochard)
- Err(anyhow::anyhow!("Not implemented (RE upload_blob)"))
+ blob: Vec,
+ digest: TDigest,
+ metadata: RemoteExecutionMetadata,
+ ) -> anyhow::Result<()> {
+ let blob = InlinedBlobWithDigest{digest:digest.clone(), blob, ..Default::default()};
+ self.upload(
+ metadata,
+ UploadRequest {
+ inlined_blobs_with_digest: Some(vec![blob]),
+ files_with_digest: None,
+ directories: None,
+ upload_only_missing: false,
+ ..Default::default()
+ },
+ )
+ .await?;
+ Ok(())
}
pub async fn download(