Skip to content

Commit 585a276

Browse files
RajivTSfacebook-github-bot
authored andcommitted
Add tokio::spawn while importing raw git trees during gitimport and remote-gitimport
Summary: As part of importing a commit in Mononoke, we recursively fetch all the trees associated with the current commit and its parent and do a manifest diff to find out which trees have been modified or added. These trees then get uploaded along with the commit. However, because of the nesting of the futures and collecting them in `FuturedUnordered` where the underlying future can acquire a lock and do other processing, we run the risk of executing without a nearby await point which can lead to the [subexecutor problem](https://fb.workplace.com/notes/1722678438098836/). This issue was reported by AOSP folks (T175797996) and so far we did not have a reliable repro. When Saurabh tried to import `frl-silicon-reefpoint`, we realized that it gets stuck everytime at the same commit. This diff fixes that issue by adding spawns in the nested futures where `GitReader` gets used. I will still keep the task open until AOSP folks confirm that they are no longer seeing this issue. Reviewed By: singhsrb Differential Revision: D53921389 fbshipit-source-id: af6e0969addc8346046f8eb59987d11606431afb
1 parent ed62b66 commit 585a276

File tree

3 files changed

+113
-107
lines changed

3 files changed

+113
-107
lines changed

eden/mononoke/git/import_tools/src/lib.rs

+40-37
Original file line numberDiff line numberDiff line change
@@ -329,9 +329,8 @@ pub async fn gitimport_acc<Uploader: GitUploader>(
329329
.try_buffered(prefs.concurrency)
330330
.and_then(|(extracted_commit, file_changes)| {
331331
let acc = &acc;
332-
let uploader = &uploader;
333332
let repo_name = &repo_name;
334-
let reader = &reader;
333+
cloned!(uploader, reader, ctx);
335334
async move {
336335
let oid = extracted_commit.metadata.oid;
337336
let bonsai_parents = extracted_commit
@@ -356,40 +355,44 @@ pub async fn gitimport_acc<Uploader: GitUploader>(
356355
// Before generating the corresponding changeset at Mononoke end, upload the raw git commit
357356
// and the git tree pointed to by the git commit.
358357
extracted_commit
359-
.changed_trees(ctx, reader)
358+
.changed_trees(&ctx, &reader)
360359
.map_ok(|entry| {
361-
cloned!(oid);
360+
cloned!(oid, uploader, reader, ctx);
362361
async move {
363-
let tree_for_commit =
364-
read_raw_object(reader, &entry.0).await.with_context(|| {
365-
format_err!(
366-
"Failed to fetch git tree {} for commit {}",
367-
entry.0,
368-
oid
369-
)
370-
})?;
371-
let tree_bytes = tree_for_commit.clone();
372-
// Upload packfile base item for given tree object and the raw Git tree
373-
let packfile_item_upload = async {
374-
uploader
375-
.upload_packfile_base_item(ctx, entry.0, tree_for_commit)
376-
.await
377-
.with_context(|| {
378-
format_err!(
379-
"Failed to upload packfile item for git tree {} for commit {}",
380-
entry.0,
381-
oid
382-
)
383-
})
384-
};
385-
let git_tree_upload = async {
386-
uploader.upload_object(ctx, entry.0, tree_bytes).await
387-
.with_context(|| {
388-
format_err!("Failed to upload raw git tree {} for commit {}", entry.0, oid)
389-
})
390-
};
391-
try_join!(packfile_item_upload, git_tree_upload)?;
392-
anyhow::Ok(())
362+
tokio::spawn(async move {
363+
let tree_for_commit =
364+
read_raw_object(&reader, &entry.0).await.with_context(|| {
365+
format_err!(
366+
"Failed to fetch git tree {} for commit {}",
367+
entry.0,
368+
oid
369+
)
370+
})?;
371+
let tree_bytes = tree_for_commit.clone();
372+
// Upload packfile base item for given tree object and the raw Git tree
373+
let packfile_item_upload = async {
374+
uploader
375+
.upload_packfile_base_item(&ctx, entry.0, tree_for_commit)
376+
.await
377+
.with_context(|| {
378+
format_err!(
379+
"Failed to upload packfile item for git tree {} for commit {}",
380+
entry.0,
381+
oid
382+
)
383+
})
384+
};
385+
let git_tree_upload = async {
386+
uploader
387+
.upload_object(&ctx, entry.0, tree_bytes).await
388+
.with_context(|| {
389+
format_err!("Failed to upload raw git tree {} for commit {}", entry.0, oid)
390+
})
391+
};
392+
try_join!(packfile_item_upload, git_tree_upload)?;
393+
anyhow::Ok(())
394+
})
395+
.await?
393396
}
394397
})
395398
.try_buffer_unordered(100)
@@ -399,7 +402,7 @@ pub async fn gitimport_acc<Uploader: GitUploader>(
399402
let packfile_item_upload = async {
400403
uploader
401404
.upload_packfile_base_item(
402-
ctx,
405+
&ctx,
403406
oid,
404407
extracted_commit.original_commit.clone(),
405408
)
@@ -410,15 +413,15 @@ pub async fn gitimport_acc<Uploader: GitUploader>(
410413
};
411414
let git_commit_upload = async {
412415
uploader
413-
.upload_object(ctx, oid, extracted_commit.original_commit.clone())
416+
.upload_object(&ctx, oid, extracted_commit.original_commit.clone())
414417
.await
415418
.with_context(|| format_err!("Failed to upload raw git commit {}", oid))
416419
};
417420
try_join!(packfile_item_upload, git_commit_upload)?;
418421
// Upload Git commit
419422
let (int_cs, bcs_id) = uploader
420423
.generate_changeset_for_commit(
421-
ctx,
424+
&ctx,
422425
bonsai_parents,
423426
extracted_commit.metadata,
424427
file_changes,

eden/mononoke/manifest/src/ops.rs

+70-67
Original file line numberDiff line numberDiff line change
@@ -75,49 +75,49 @@ where
7575
init,
7676
move |(manifest_id, selector, path, recursive)| {
7777
let (select, subentries) = selector.deconstruct();
78-
78+
cloned!(ctx, store);
7979
async move {
80-
let manifest = manifest_id.load(ctx, store).await?;
81-
82-
let mut output = Vec::new();
83-
let mut recurse = Vec::new();
84-
85-
if recursive || select.is_recursive() {
86-
output.push((path.clone(), Entry::Tree(manifest_id)));
87-
let mut stream = manifest.list(ctx, store).await?;
88-
while let Some((name, entry)) = stream.try_next().await? {
89-
let path = path.join(&name);
90-
match entry {
91-
Entry::Leaf(_) => {
92-
output.push((path.clone(), entry));
93-
}
94-
Entry::Tree(manifest_id) => {
95-
recurse.push((manifest_id, Default::default(), path, true));
96-
}
97-
}
98-
}
99-
} else {
100-
if select.is_selected() {
80+
tokio::spawn(async move {
81+
let manifest = manifest_id.load(&ctx, &store).await?;
82+
let mut output = Vec::new();
83+
let mut recurse = Vec::new();
84+
if recursive || select.is_recursive() {
10185
output.push((path.clone(), Entry::Tree(manifest_id)));
102-
}
103-
for (name, selector) in subentries {
104-
if let Some(entry) = manifest.lookup(ctx, store, &name).await? {
86+
let mut stream = manifest.list(&ctx, &store).await?;
87+
while let Some((name, entry)) = stream.try_next().await? {
10588
let path = path.join(&name);
10689
match entry {
10790
Entry::Leaf(_) => {
108-
if selector.value.is_selected() {
109-
output.push((path.clone(), entry));
110-
}
91+
output.push((path.clone(), entry));
11192
}
11293
Entry::Tree(manifest_id) => {
113-
recurse.push((manifest_id, selector, path, false));
94+
recurse.push((manifest_id, Default::default(), path, true));
95+
}
96+
}
97+
}
98+
} else {
99+
if select.is_selected() {
100+
output.push((path.clone(), Entry::Tree(manifest_id)));
101+
}
102+
for (name, selector) in subentries {
103+
if let Some(entry) = manifest.lookup(&ctx, &store, &name).await? {
104+
let path = path.join(&name);
105+
match entry {
106+
Entry::Leaf(_) => {
107+
if selector.value.is_selected() {
108+
output.push((path.clone(), entry));
109+
}
110+
}
111+
Entry::Tree(manifest_id) => {
112+
recurse.push((manifest_id, selector, path, false));
113+
}
114114
}
115115
}
116116
}
117117
}
118-
}
119118

120-
Ok::<_, Error>((output, recurse))
119+
Ok::<_, Error>((output, recurse))
120+
}).await?
121121
}.boxed()
122122
},
123123
)
@@ -507,50 +507,53 @@ where
507507
{
508508
match diff_against.first().cloned() {
509509
Some(parent) => async move {
510-
let mut new_entries = Vec::new();
511-
let mut parent_diff = parent.diff(ctx.clone(), store.clone(), mf_id);
512-
while let Some(diff_entry) = parent_diff.try_next().await? {
513-
match diff_entry {
514-
Diff::Added(path, entry) => new_entries.push((path, entry, vec![])),
515-
Diff::Removed(..) => continue,
516-
Diff::Changed(path, parent_entry, entry) => {
517-
new_entries.push((path, entry, vec![parent_entry]))
510+
tokio::spawn(async move {
511+
let mut new_entries = Vec::new();
512+
let mut parent_diff = parent.diff(ctx.clone(), store.clone(), mf_id);
513+
while let Some(diff_entry) = parent_diff.try_next().await? {
514+
match diff_entry {
515+
Diff::Added(path, entry) => new_entries.push((path, entry, vec![])),
516+
Diff::Removed(..) => continue,
517+
Diff::Changed(path, parent_entry, entry) => {
518+
new_entries.push((path, entry, vec![parent_entry]))
519+
}
518520
}
519521
}
520-
}
521522

522-
let paths: Vec<_> = new_entries
523-
.clone()
524-
.into_iter()
525-
.map(|(path, _, _)| path)
526-
.collect();
527-
528-
let futs = diff_against.into_iter().skip(1).map(move |p| {
529-
p.find_entries(ctx.clone(), store.clone(), paths.clone())
530-
.try_collect::<HashMap<_, _>>()
531-
});
532-
let entries_in_parents = future::try_join_all(futs).await?;
533-
534-
let mut res = vec![];
535-
for (path, unode, mut parent_entries) in new_entries {
536-
let mut new_entry = true;
537-
for p in &entries_in_parents {
538-
if let Some(parent_entry) = p.get(&path) {
539-
if parent_entry == &unode {
540-
new_entry = false;
541-
break;
542-
} else {
543-
parent_entries.push(parent_entry.clone());
523+
let paths: Vec<_> = new_entries
524+
.clone()
525+
.into_iter()
526+
.map(|(path, _, _)| path)
527+
.collect();
528+
529+
let futs = diff_against.into_iter().skip(1).map(move |p| {
530+
p.find_entries(ctx.clone(), store.clone(), paths.clone())
531+
.try_collect::<HashMap<_, _>>()
532+
});
533+
let entries_in_parents = future::try_join_all(futs).await?;
534+
535+
let mut res = vec![];
536+
for (path, unode, mut parent_entries) in new_entries {
537+
let mut new_entry = true;
538+
for p in &entries_in_parents {
539+
if let Some(parent_entry) = p.get(&path) {
540+
if parent_entry == &unode {
541+
new_entry = false;
542+
break;
543+
} else {
544+
parent_entries.push(parent_entry.clone());
545+
}
544546
}
545547
}
546-
}
547548

548-
if new_entry {
549-
res.push((path, unode, parent_entries));
549+
if new_entry {
550+
res.push((path, unode, parent_entries));
551+
}
550552
}
551-
}
552553

553-
Ok(stream::iter(res.into_iter().map(Ok)))
554+
Ok(stream::iter(res.into_iter().map(Ok)))
555+
})
556+
.await?
554557
}
555558
.try_flatten_stream()
556559
.right_stream(),

eden/mononoke/tests/integration/test-newadmin-derived-data-list-manifests.t

+3-3
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,9 @@ Fsnodes from root path
6161
$ with_stripped_logs mononoke_newadmin derived-data -R repo list-manifests -p "" -i "$B" fsnodes
6262
A eb56488e97bb4cf5eb17f05357b80108a4a71f6c3bab52dfcaec07161d105ec9 regular 1
6363
B 55662471e2a28db8257939b2f9a2d24e65b46a758bac12914a58f17dcde6905f regular 1
64-
b/hoo.txt 88c50336ada15d8abe61f2adce8af17b63eb74985d50eec76d4d0248f33bb4a9 regular 5
6564
a/foo.txt 67f9f510b6a13f94986928ba0f270ec005b194edd77b22a13dec797471a4fe85 regular 5
6665
a/b/bar.txt 638aceddb6283739ca98ac2cb18bf6d8d5358439ea187fd4ab0257d24d6d6e47 regular 5
66+
b/hoo.txt 88c50336ada15d8abe61f2adce8af17b63eb74985d50eec76d4d0248f33bb4a9 regular 5
6767
Unodes
6868
$ with_stripped_logs mononoke_newadmin derived-data -R repo list-manifests -p "a" -B main unodes
6969
a/ ManifestUnodeId(Blake2(dbdbdd1b393b32741aaab820850468c06b3f50319bea3728e8b2d346e61a01ef))
@@ -79,12 +79,12 @@ Unodes from root
7979
/ ManifestUnodeId(Blake2(8ade1b6151194edff398e823450e3bfbc8a1252958ea89f0c3ef58b0c3d30e70))
8080
A FileUnodeId(Blake2(5da8409b6ec0f3759444f93c2c5194f5c94c02037095ca16b5f3e0f70152c613))
8181
B FileUnodeId(Blake2(eb68a776a3017fcc811f6f23a8724a771db09de2f35fda2db314b580d41fb7ae))
82-
b/ ManifestUnodeId(Blake2(4668835e236dfec9b0273f21f33cfa4570769a24a5a5422b10920b28e5440092))
83-
b/hoo.txt FileUnodeId(Blake2(54942dc4ea2bd38839a40566d01e06d56e479adaeba9b3c64b94e55ae6911936))
8482
a/ ManifestUnodeId(Blake2(abb223a8d49252e82a934e09c8031ce77dd5fe70d481aace567b6f0b13e90e95))
8583
a/foo.txt FileUnodeId(Blake2(5b5ddd33b0347715e192bfc25bc172ed8c5800d87ba3d3238ef88dee25d28dc6))
8684
a/b/ ManifestUnodeId(Blake2(0d78d5210f51fcf3f7dd906722f1a0080ad4062b71c32a811b13edec903eeb06))
8785
a/b/bar.txt FileUnodeId(Blake2(4e8fbca02d5fa0d2a9abb7f075d8b5c4ad22e54e49dd6e18e00590032b1d3064))
86+
b/ ManifestUnodeId(Blake2(4668835e236dfec9b0273f21f33cfa4570769a24a5a5422b10920b28e5440092))
87+
b/hoo.txt FileUnodeId(Blake2(54942dc4ea2bd38839a40566d01e06d56e479adaeba9b3c64b94e55ae6911936))
8888

8989
Deleted manifests
9090
$ with_stripped_logs mononoke_newadmin derived-data -R repo list-manifests -p "a" -i "$B" deleted-manifests

0 commit comments

Comments
 (0)