Skip to content

Commit bc41b62

Browse files
Hugo van der Wijstthoughtpolice
Hugo van der Wijst
authored andcommitted
remote_execution: synchronize concurrent upload of identical artifacts
Currently, upload requests are handled in parallel without knowledge of other ongoing requests. If multiple actions depend on the same set of large locally available artifacts, then they will all be uploaded at the same time. This is particularly poor behavior for large files. Just store in-flight requests in a dashmap, and wait if an upload is already extant. Authored-by: Hugo van der Wijst <[email protected]> Signed-off-by: Hugo van der Wijst <[email protected]> Signed-off-by: Austin Seipp <[email protected]>
1 parent c21d1fa commit bc41b62

File tree

3 files changed

+108
-37
lines changed

3 files changed

+108
-37
lines changed

remote_execution/oss/re_grpc/BUCK

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ rust_library(
1010
],
1111
deps = [
1212
"fbsource//third-party/rust:anyhow",
13+
"fbsource//third-party/rust:dashmap",
1314
"fbsource//third-party/rust:futures",
1415
"fbsource//third-party/rust:http",
1516
"fbsource//third-party/rust:lru",

remote_execution/oss/re_grpc/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ version = "0.1.0"
99
[dependencies]
1010
anyhow = { workspace = true }
1111
dupe = { workspace = true }
12+
dashmap = { workspace = true }
1213
futures = { workspace = true }
1314
gazebo = { workspace = true }
1415
http = { workspace = true }

remote_execution/oss/re_grpc/src/client.rs

Lines changed: 106 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use std::time::Instant;
1919
use anyhow::Context;
2020
use buck2_re_configuration::Buck2OssReConfiguration;
2121
use buck2_re_configuration::HttpHeader;
22+
use dashmap::DashMap;
2223
use dupe::Dupe;
2324
use futures::Stream;
2425
use futures::future::BoxFuture;
@@ -491,13 +492,21 @@ impl FindMissingCache {
491492
}
492493
}
493494

495+
#[derive(Clone)]
496+
enum OngoingUploadStatus {
497+
Active(tokio::sync::watch::Receiver<Result<(), ()>>),
498+
Done,
499+
Error,
500+
}
501+
494502
pub struct REClient {
495503
runtime_opts: RERuntimeOpts,
496504
grpc_clients: GRPCClients,
497505
capabilities: RECapabilities,
498506
instance_name: InstanceName,
499507
// buck2 calls find_missing for same blobs
500508
find_missing_cache: Mutex<FindMissingCache>,
509+
prev_uploads: DashMap<TDigest, OngoingUploadStatus>,
501510
}
502511

503512
impl Drop for REClient {
@@ -572,6 +581,7 @@ impl REClient {
572581
ttl: Duration::from_secs(12 * 60 * 60), // 12 hours TODO: Tune this parameter
573582
last_check: Instant::now(),
574583
}),
584+
prev_uploads: DashMap::new(),
575585
}
576586
}
577587

@@ -759,6 +769,7 @@ impl REClient {
759769
request,
760770
self.capabilities.max_total_batch_size,
761771
self.runtime_opts.max_concurrent_uploads_per_action,
772+
&self.prev_uploads,
762773
|re_request| async {
763774
let metadata = metadata.clone();
764775
let mut cas_client = self.grpc_clients.cas_client.clone();
@@ -1319,6 +1330,7 @@ async fn upload_impl<Byt, Cas>(
13191330
request: UploadRequest,
13201331
max_total_batch_size: usize,
13211332
max_concurrent_uploads: Option<usize>,
1333+
prev_uploads: &DashMap<TDigest, OngoingUploadStatus>,
13221334
cas_f: impl Fn(BatchUpdateBlobsRequest) -> Cas + Sync + Send + Copy,
13231335
bystream_fut: impl Fn(Vec<WriteRequest>) -> Byt + Sync + Send + Copy,
13241336
) -> anyhow::Result<UploadResponse>
@@ -1379,10 +1391,9 @@ where
13791391

13801392
// Create futures for any files that needs uploading.
13811393
for file in request.files_with_digest.unwrap_or_default() {
1382-
let hash = file.digest.hash.clone();
1383-
let size = file.digest.size_in_bytes;
1394+
let digest = file.digest.clone();
13841395
let name = file.name.clone();
1385-
if size < max_total_batch_size as i64 {
1396+
if digest.size_in_bytes < max_total_batch_size as i64 {
13861397
batched_blob_updates.push(BatchUploadRequest::File(file));
13871398
continue;
13881399
}
@@ -1391,45 +1402,96 @@ where
13911402
"{}uploads/{}/blobs/{}/{}",
13921403
instance_name.as_resource_prefix(),
13931404
client_uuid,
1394-
hash.clone(),
1395-
size
1405+
file.digest.hash,
1406+
file.digest.size_in_bytes
13961407
);
1408+
1409+
enum UploadStatus {
1410+
New(tokio::sync::watch::Sender<Result<(), ()>>),
1411+
Ongoing(OngoingUploadStatus),
1412+
}
1413+
1414+
let upload_status = match prev_uploads.entry(digest.clone()) {
1415+
dashmap::mapref::entry::Entry::Occupied(o) => UploadStatus::Ongoing(o.get().clone()),
1416+
dashmap::mapref::entry::Entry::Vacant(v) => {
1417+
let (tx, rx) = tokio::sync::watch::channel(Err(()));
1418+
v.insert(OngoingUploadStatus::Active(rx));
1419+
UploadStatus::New(tx)
1420+
}
1421+
};
13971422
let fut = async move {
1398-
let mut file = tokio::fs::File::open(&name)
1399-
.await
1400-
.with_context(|| format!("Opening `{name}` for reading failed"))?;
1401-
let mut data = vec![0; max_total_batch_size];
1402-
1403-
let mut write_offset = 0;
1404-
let mut upload_segments = Vec::new();
1405-
loop {
1406-
let length = file
1407-
.read(&mut data)
1408-
.await
1409-
.with_context(|| format!("Error reading from {name}"))?;
1410-
if length == 0 {
1411-
break;
1423+
match upload_status {
1424+
UploadStatus::Ongoing(OngoingUploadStatus::Active(mut rx)) => {
1425+
// Another task was already uploading this artifact, wait for it complete and report result.
1426+
rx.changed().await?;
1427+
rx.borrow_and_update().as_ref().map_err(|_e| {
1428+
anyhow::anyhow!("Upload queued for previous action failed.")
1429+
})?;
14121430
}
1413-
upload_segments.push(WriteRequest {
1414-
resource_name: resource_name.to_owned(),
1415-
write_offset,
1416-
finish_write: false,
1417-
data: data[..length].to_owned(),
1418-
});
1419-
write_offset += length as i64;
1420-
}
1421-
upload_segments
1422-
.last_mut()
1423-
.with_context(|| format!("Read no segments from `{name} "))?
1424-
.finish_write = true;
1431+
UploadStatus::Ongoing(OngoingUploadStatus::Done) => {
1432+
// Another task has already completed the upload of this artifact, no need to do any work.
1433+
}
1434+
UploadStatus::Ongoing(OngoingUploadStatus::Error) => {
1435+
// Another task tried to perform the transmission, but failed.
1436+
return Err(anyhow::anyhow!("Upload queued for previous action failed."));
1437+
}
1438+
UploadStatus::New(tx) => {
1439+
let mut file = tokio::fs::File::open(&name)
1440+
.await
1441+
.with_context(|| format!("Opening `{name}` for reading failed"))?;
1442+
let mut data = vec![0; max_total_batch_size];
1443+
1444+
let mut write_offset = 0;
1445+
let mut upload_segments = Vec::new();
1446+
loop {
1447+
let length = file
1448+
.read(&mut data)
1449+
.await
1450+
.with_context(|| format!("Error reading from {name}"))?;
1451+
if length == 0 {
1452+
break;
1453+
}
1454+
upload_segments.push(WriteRequest {
1455+
resource_name: resource_name.to_owned(),
1456+
write_offset,
1457+
finish_write: false,
1458+
data: data[..length].to_owned(),
1459+
});
1460+
write_offset += length as i64;
1461+
}
1462+
upload_segments
1463+
.last_mut()
1464+
.with_context(|| format!("Read no segments from `{name} "))?
1465+
.finish_write = true;
1466+
1467+
let upload_ret = bystream_fut(upload_segments)
1468+
.await
1469+
.and_then(|resp| {
1470+
if resp.committed_size != digest.size_in_bytes {
1471+
Err(anyhow::anyhow!(
1472+
"Failed to upload `{name}`: invalid committed_size from WriteResponse"
1473+
))
1474+
}
1475+
else {
1476+
Ok(())
1477+
}
1478+
});
14251479

1426-
let resp = bystream_fut(upload_segments).await?;
1427-
if resp.committed_size != size {
1428-
return Err(anyhow::anyhow!(
1429-
"Failed to upload `{name}`: invalid committed_size from WriteResponse"
1430-
));
1480+
// Mark artifact as uploaded and notify other potentially waiting tasks.
1481+
if upload_ret.is_ok() {
1482+
prev_uploads.alter(&digest, |_, _| OngoingUploadStatus::Done);
1483+
let _ = tx.send(upload_ret.as_ref().map_err(|_| ()).cloned());
1484+
} else {
1485+
prev_uploads.alter(&digest, |_, _| OngoingUploadStatus::Error);
1486+
let _ = tx.send(Err(()));
1487+
}
1488+
1489+
// Only propage errors _after_ notifying other waiting tasks that this task is complete.
1490+
upload_ret?;
1491+
}
14311492
}
1432-
Ok(vec![hash])
1493+
1494+
Ok(vec![digest.hash])
14331495
};
14341496
upload_futures.push(Box::pin(fut));
14351497
}
@@ -2204,6 +2266,7 @@ mod tests {
22042266
req,
22052267
10000,
22062268
None,
2269+
&DashMap::new(),
22072270
|req| {
22082271
let res = res.clone();
22092272
let digest1 = digest1.clone();
@@ -2287,6 +2350,7 @@ mod tests {
22872350
req,
22882351
10, // kept small to simulate a large file upload
22892352
None,
2353+
&DashMap::new(),
22902354
|req| {
22912355
let res = res.clone();
22922356
let digest1 = digest1.clone();
@@ -2361,6 +2425,7 @@ mod tests {
23612425
req,
23622426
10, // kept small to simulate a large inlined upload
23632427
None,
2428+
&DashMap::new(),
23642429
|req| {
23652430
let res = res.clone();
23662431
let digest1 = digest1.clone();
@@ -2422,6 +2487,7 @@ mod tests {
24222487
req,
24232488
10,
24242489
None,
2490+
&DashMap::new(),
24252491
|_req| async move {
24262492
panic!("This should not be called as there are no blobs to upload in batch");
24272493
},
@@ -2483,6 +2549,7 @@ mod tests {
24832549
req,
24842550
3,
24852551
None,
2552+
&DashMap::new(),
24862553
|_req| async move {
24872554
panic!("Not called");
24882555
},
@@ -2524,6 +2591,7 @@ mod tests {
25242591
req,
25252592
0,
25262593
None,
2594+
&DashMap::new(),
25272595
|_req| async move {
25282596
panic!("Not called");
25292597
},
@@ -2570,6 +2638,7 @@ mod tests {
25702638
req,
25712639
1,
25722640
None,
2641+
&DashMap::new(),
25732642
|_req| async move {
25742643
panic!("Not called");
25752644
},

0 commit comments

Comments
 (0)