@@ -19,6 +19,7 @@ use std::time::Instant;
19
19
use anyhow:: Context ;
20
20
use buck2_re_configuration:: Buck2OssReConfiguration ;
21
21
use buck2_re_configuration:: HttpHeader ;
22
+ use dashmap:: DashMap ;
22
23
use dupe:: Dupe ;
23
24
use futures:: Stream ;
24
25
use futures:: future:: BoxFuture ;
@@ -491,13 +492,21 @@ impl FindMissingCache {
491
492
}
492
493
}
493
494
495
+ #[ derive( Clone ) ]
496
+ enum OngoingUploadStatus {
497
+ Active ( tokio:: sync:: watch:: Receiver < Result < ( ) , ( ) > > ) ,
498
+ Done ,
499
+ Error ,
500
+ }
501
+
494
502
pub struct REClient {
495
503
runtime_opts : RERuntimeOpts ,
496
504
grpc_clients : GRPCClients ,
497
505
capabilities : RECapabilities ,
498
506
instance_name : InstanceName ,
499
507
// buck2 calls find_missing for same blobs
500
508
find_missing_cache : Mutex < FindMissingCache > ,
509
+ prev_uploads : DashMap < TDigest , OngoingUploadStatus > ,
501
510
}
502
511
503
512
impl Drop for REClient {
@@ -572,6 +581,7 @@ impl REClient {
572
581
ttl : Duration :: from_secs ( 12 * 60 * 60 ) , // 12 hours TODO: Tune this parameter
573
582
last_check : Instant :: now ( ) ,
574
583
} ) ,
584
+ prev_uploads : DashMap :: new ( ) ,
575
585
}
576
586
}
577
587
@@ -759,6 +769,7 @@ impl REClient {
759
769
request,
760
770
self . capabilities . max_total_batch_size ,
761
771
self . runtime_opts . max_concurrent_uploads_per_action ,
772
+ & self . prev_uploads ,
762
773
|re_request| async {
763
774
let metadata = metadata. clone ( ) ;
764
775
let mut cas_client = self . grpc_clients . cas_client . clone ( ) ;
@@ -1319,6 +1330,7 @@ async fn upload_impl<Byt, Cas>(
1319
1330
request : UploadRequest ,
1320
1331
max_total_batch_size : usize ,
1321
1332
max_concurrent_uploads : Option < usize > ,
1333
+ prev_uploads : & DashMap < TDigest , OngoingUploadStatus > ,
1322
1334
cas_f : impl Fn ( BatchUpdateBlobsRequest ) -> Cas + Sync + Send + Copy ,
1323
1335
bystream_fut : impl Fn ( Vec < WriteRequest > ) -> Byt + Sync + Send + Copy ,
1324
1336
) -> anyhow:: Result < UploadResponse >
@@ -1379,10 +1391,9 @@ where
1379
1391
1380
1392
// Create futures for any files that needs uploading.
1381
1393
for file in request. files_with_digest . unwrap_or_default ( ) {
1382
- let hash = file. digest . hash . clone ( ) ;
1383
- let size = file. digest . size_in_bytes ;
1394
+ let digest = file. digest . clone ( ) ;
1384
1395
let name = file. name . clone ( ) ;
1385
- if size < max_total_batch_size as i64 {
1396
+ if digest . size_in_bytes < max_total_batch_size as i64 {
1386
1397
batched_blob_updates. push ( BatchUploadRequest :: File ( file) ) ;
1387
1398
continue ;
1388
1399
}
@@ -1391,45 +1402,96 @@ where
1391
1402
"{}uploads/{}/blobs/{}/{}" ,
1392
1403
instance_name. as_resource_prefix( ) ,
1393
1404
client_uuid,
1394
- hash . clone ( ) ,
1395
- size
1405
+ file . digest . hash ,
1406
+ file . digest . size_in_bytes
1396
1407
) ;
1408
+
1409
+ enum UploadStatus {
1410
+ New ( tokio:: sync:: watch:: Sender < Result < ( ) , ( ) > > ) ,
1411
+ Ongoing ( OngoingUploadStatus ) ,
1412
+ }
1413
+
1414
+ let upload_status = match prev_uploads. entry ( digest. clone ( ) ) {
1415
+ dashmap:: mapref:: entry:: Entry :: Occupied ( o) => UploadStatus :: Ongoing ( o. get ( ) . clone ( ) ) ,
1416
+ dashmap:: mapref:: entry:: Entry :: Vacant ( v) => {
1417
+ let ( tx, rx) = tokio:: sync:: watch:: channel ( Err ( ( ) ) ) ;
1418
+ v. insert ( OngoingUploadStatus :: Active ( rx) ) ;
1419
+ UploadStatus :: New ( tx)
1420
+ }
1421
+ } ;
1397
1422
let fut = async move {
1398
- let mut file = tokio:: fs:: File :: open ( & name)
1399
- . await
1400
- . with_context ( || format ! ( "Opening `{name}` for reading failed" ) ) ?;
1401
- let mut data = vec ! [ 0 ; max_total_batch_size] ;
1402
-
1403
- let mut write_offset = 0 ;
1404
- let mut upload_segments = Vec :: new ( ) ;
1405
- loop {
1406
- let length = file
1407
- . read ( & mut data)
1408
- . await
1409
- . with_context ( || format ! ( "Error reading from {name}" ) ) ?;
1410
- if length == 0 {
1411
- break ;
1423
+ match upload_status {
1424
+ UploadStatus :: Ongoing ( OngoingUploadStatus :: Active ( mut rx) ) => {
1425
+ // Another task was already uploading this artifact, wait for it complete and report result.
1426
+ rx. changed ( ) . await ?;
1427
+ rx. borrow_and_update ( ) . as_ref ( ) . map_err ( |_e| {
1428
+ anyhow:: anyhow!( "Upload queued for previous action failed." )
1429
+ } ) ?;
1412
1430
}
1413
- upload_segments. push ( WriteRequest {
1414
- resource_name : resource_name. to_owned ( ) ,
1415
- write_offset,
1416
- finish_write : false ,
1417
- data : data[ ..length] . to_owned ( ) ,
1418
- } ) ;
1419
- write_offset += length as i64 ;
1420
- }
1421
- upload_segments
1422
- . last_mut ( )
1423
- . with_context ( || format ! ( "Read no segments from `{name} " ) ) ?
1424
- . finish_write = true ;
1431
+ UploadStatus :: Ongoing ( OngoingUploadStatus :: Done ) => {
1432
+ // Another task has already completed the upload of this artifact, no need to do any work.
1433
+ }
1434
+ UploadStatus :: Ongoing ( OngoingUploadStatus :: Error ) => {
1435
+ // Another task tried to perform the transmission, but failed.
1436
+ return Err ( anyhow:: anyhow!( "Upload queued for previous action failed." ) ) ;
1437
+ }
1438
+ UploadStatus :: New ( tx) => {
1439
+ let mut file = tokio:: fs:: File :: open ( & name)
1440
+ . await
1441
+ . with_context ( || format ! ( "Opening `{name}` for reading failed" ) ) ?;
1442
+ let mut data = vec ! [ 0 ; max_total_batch_size] ;
1443
+
1444
+ let mut write_offset = 0 ;
1445
+ let mut upload_segments = Vec :: new ( ) ;
1446
+ loop {
1447
+ let length = file
1448
+ . read ( & mut data)
1449
+ . await
1450
+ . with_context ( || format ! ( "Error reading from {name}" ) ) ?;
1451
+ if length == 0 {
1452
+ break ;
1453
+ }
1454
+ upload_segments. push ( WriteRequest {
1455
+ resource_name : resource_name. to_owned ( ) ,
1456
+ write_offset,
1457
+ finish_write : false ,
1458
+ data : data[ ..length] . to_owned ( ) ,
1459
+ } ) ;
1460
+ write_offset += length as i64 ;
1461
+ }
1462
+ upload_segments
1463
+ . last_mut ( )
1464
+ . with_context ( || format ! ( "Read no segments from `{name} " ) ) ?
1465
+ . finish_write = true ;
1466
+
1467
+ let upload_ret = bystream_fut ( upload_segments)
1468
+ . await
1469
+ . and_then ( |resp| {
1470
+ if resp. committed_size != digest. size_in_bytes {
1471
+ Err ( anyhow:: anyhow!(
1472
+ "Failed to upload `{name}`: invalid committed_size from WriteResponse"
1473
+ ) )
1474
+ }
1475
+ else {
1476
+ Ok ( ( ) )
1477
+ }
1478
+ } ) ;
1425
1479
1426
- let resp = bystream_fut ( upload_segments) . await ?;
1427
- if resp. committed_size != size {
1428
- return Err ( anyhow:: anyhow!(
1429
- "Failed to upload `{name}`: invalid committed_size from WriteResponse"
1430
- ) ) ;
1480
+ // Mark artifact as uploaded and notify other potentially waiting tasks.
1481
+ if upload_ret. is_ok ( ) {
1482
+ prev_uploads. alter ( & digest, |_, _| OngoingUploadStatus :: Done ) ;
1483
+ let _ = tx. send ( upload_ret. as_ref ( ) . map_err ( |_| ( ) ) . cloned ( ) ) ;
1484
+ } else {
1485
+ prev_uploads. alter ( & digest, |_, _| OngoingUploadStatus :: Error ) ;
1486
+ let _ = tx. send ( Err ( ( ) ) ) ;
1487
+ }
1488
+
1489
+ // Only propage errors _after_ notifying other waiting tasks that this task is complete.
1490
+ upload_ret?;
1491
+ }
1431
1492
}
1432
- Ok ( vec ! [ hash] )
1493
+
1494
+ Ok ( vec ! [ digest. hash] )
1433
1495
} ;
1434
1496
upload_futures. push ( Box :: pin ( fut) ) ;
1435
1497
}
@@ -2204,6 +2266,7 @@ mod tests {
2204
2266
req,
2205
2267
10000 ,
2206
2268
None ,
2269
+ & DashMap :: new ( ) ,
2207
2270
|req| {
2208
2271
let res = res. clone ( ) ;
2209
2272
let digest1 = digest1. clone ( ) ;
@@ -2287,6 +2350,7 @@ mod tests {
2287
2350
req,
2288
2351
10 , // kept small to simulate a large file upload
2289
2352
None ,
2353
+ & DashMap :: new ( ) ,
2290
2354
|req| {
2291
2355
let res = res. clone ( ) ;
2292
2356
let digest1 = digest1. clone ( ) ;
@@ -2361,6 +2425,7 @@ mod tests {
2361
2425
req,
2362
2426
10 , // kept small to simulate a large inlined upload
2363
2427
None ,
2428
+ & DashMap :: new ( ) ,
2364
2429
|req| {
2365
2430
let res = res. clone ( ) ;
2366
2431
let digest1 = digest1. clone ( ) ;
@@ -2422,6 +2487,7 @@ mod tests {
2422
2487
req,
2423
2488
10 ,
2424
2489
None ,
2490
+ & DashMap :: new ( ) ,
2425
2491
|_req| async move {
2426
2492
panic ! ( "This should not be called as there are no blobs to upload in batch" ) ;
2427
2493
} ,
@@ -2483,6 +2549,7 @@ mod tests {
2483
2549
req,
2484
2550
3 ,
2485
2551
None ,
2552
+ & DashMap :: new ( ) ,
2486
2553
|_req| async move {
2487
2554
panic ! ( "Not called" ) ;
2488
2555
} ,
@@ -2524,6 +2591,7 @@ mod tests {
2524
2591
req,
2525
2592
0 ,
2526
2593
None ,
2594
+ & DashMap :: new ( ) ,
2527
2595
|_req| async move {
2528
2596
panic ! ( "Not called" ) ;
2529
2597
} ,
@@ -2570,6 +2638,7 @@ mod tests {
2570
2638
req,
2571
2639
1 ,
2572
2640
None ,
2641
+ & DashMap :: new ( ) ,
2573
2642
|_req| async move {
2574
2643
panic ! ( "Not called" ) ;
2575
2644
} ,
0 commit comments