-
Couldn't load subscription status.
- Fork 301
remote_execution: synchronize concurrent upload of identical artifacts #945
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
remote_execution: synchronize concurrent upload of identical artifacts #945
Conversation
|
@facebook-github-bot has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. (Because this pull request was imported automatically, there will not be any future comments.) |
|
This is an up-to-date refile of #750 |
| )); | ||
| // Mark artifact as uploaded and notify other potentially waiting tasks. | ||
| if upload_ret.is_ok() { | ||
| prev_uploads.alter(&digest, |_, _| OngoingUploadStatus::Done); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does prev_uploads grown unbounded? was expecting to see a removal here to keep it sized to the inflight digests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My concern was with the following race:
- Task 1: started an upload of digest ABC
- Task 2: finds ABC is missing as the upload hasn't succeeded yet
- Task 1: finishes upload, notifies other ongoing uploads of success, removes entry from
prev_uploads - Task 2: starts upload, causing it to attempt to reupload
Thinking about this more the issue isn't that bad, as RE should immediately respond to the upload request that it is fully committed. To remove that final upload we could also directly populate REClient::find_missing_cache once the upload is finished and checking REClient::find_missing_cache before starting an upload.
21b8a52 to
7165604
Compare
5b93158 to
2b946ad
Compare
0eb7b2b to
b93b023
Compare
b93b023 to
71c5b28
Compare
| enum OngoingUploadStatus { | ||
| Active(tokio::sync::watch::Receiver<Result<(), ()>>), | ||
| Done, | ||
| Error, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to store actual error here?
| ); | ||
|
|
||
| enum UploadStatus { | ||
| New(tokio::sync::watch::Sender<Result<(), ()>>), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can use oneshot channel? My understanding is we use it only for synchronization
| let _ = tx.send(upload_ret.as_ref().map_err(|_| ()).cloned()); | ||
| } else { | ||
| prev_uploads.alter(&digest, |_, _| OngoingUploadStatus::Error); | ||
| let _ = tx.send(Err(())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably can send a real error here, otherwise I don't see a point of this error.
21367ba to
ae010d1
Compare
f9a76a2 to
292b461
Compare
292b461 to
5001c5d
Compare
5001c5d to
573850b
Compare
573850b to
4b88a7c
Compare
0dad29b to
8488f36
Compare
8488f36 to
1e5f3db
Compare
1e5f3db to
a984853
Compare
a984853 to
189e8bc
Compare
189e8bc to
c69c68c
Compare
|
There is a subtle bug in this PR. All of these remote execution functions are being executed from tasks that can be canceled. When a second task gets a tokio::sync::watch::channel Receiver to wait on the outcome of the first task, you've got a problem: The first task may end up being canceled. I'm guessing that will cause the Sender side to drop and cause the Reciever to get a "channel closed" error. I think the best fix for this would probably be to used shared futures. https://docs.rs/futures/latest/futures/future/trait.FutureExt.html#method.shared Another interesting observation here is that this PR only deduplicates larger blobs. Smaller blobs will still be uploaded multiple times from multiple ongoing tasks. |
Currently, upload requests are handled in parallel without knowledge of other ongoing requests. If multiple actions depend on the same set of large locally available artifacts, then they will all be uploaded at the same time. This is particularly poor behavior for large files. Just store in-flight requests in a dashmap, and wait if an upload is already extant. Authored-by: Hugo van der Wijst <[email protected]> Signed-off-by: Hugo van der Wijst <[email protected]> Signed-off-by: Austin Seipp <[email protected]>
c69c68c to
519a2bd
Compare
Currently, upload requests are handled in parallel without knowledge of other ongoing requests. If multiple actions depend on the same set of large locally available artifacts, then they will all be uploaded at the same time. This is particularly poor behavior for large files.
Just store in-flight requests in a dashmap, and wait if an upload is already extant.