55 * GNU General Public License version 2.
66 */
77
8- use std:: collections:: BTreeMap ;
9- use std:: collections:: HashMap ;
108use std:: collections:: HashSet ;
119use std:: sync:: Arc ;
1210
1311use anyhow:: Error ;
1412use anyhow:: Result ;
1513use blobrepo:: BlobRepo ;
16- use changeset_fetcher:: ChangesetFetcherArc ;
17- use changeset_fetcher:: ChangesetFetcherRef ;
1814use context:: CoreContext ;
1915use derived_data_utils:: DerivedUtils ;
2016use futures:: stream;
2117use futures:: stream:: FuturesUnordered ;
22- use futures:: stream:: StreamExt ;
2318use futures:: stream:: TryStreamExt ;
2419use mononoke_types:: ChangesetId ;
25- use mononoke_types:: Generation ;
2620use repo_derived_data:: RepoDerivedDataArc ;
2721use skiplist:: SkiplistIndex ;
28- use slog:: info;
2922
3023/// Determine which heads are underived in any of the derivers.
3124async fn underived_heads (
@@ -51,29 +44,6 @@ async fn underived_heads(
5144 . await
5245}
5346
54- /// If skiplist parents are not available, fetch the parents and their
55- /// generation from the repo.
56- async fn parents_with_generations (
57- ctx : & CoreContext ,
58- repo : & BlobRepo ,
59- csid : ChangesetId ,
60- ) -> Result < Vec < ( ChangesetId , Generation ) > > {
61- let parents = repo. changeset_fetcher ( ) . get_parents ( ctx, csid) . await ?;
62- let parents_with_generations =
63- stream:: iter ( parents. into_iter ( ) . map ( |parent_csid| async move {
64- let gen = repo
65- . changeset_fetcher ( )
66- . get_generation_number ( ctx, parent_csid)
67- . await ?;
68- Ok ( Some ( ( parent_csid, gen) ) )
69- } ) )
70- . buffered ( 100 )
71- . try_filter_map ( |maybe_csid_gen| async move { Ok :: < _ , Error > ( maybe_csid_gen) } )
72- . try_collect :: < Vec < _ > > ( )
73- . await ?;
74- Ok ( parents_with_generations)
75- }
76-
7747/// Slice a respository into a sequence of slices for derivation.
7848///
7949/// For large repositories with a long history, computing the full set of
@@ -124,114 +94,14 @@ pub(crate) async fn slice_repository(
12494 derivers : & [ Arc < dyn DerivedUtils > ] ,
12595 heads : Vec < ChangesetId > ,
12696 slice_size : u64 ,
127- ) -> Result < ( usize , impl Iterator < Item = ( u64 , Vec < ChangesetId > ) > ) > {
128- let heads = underived_heads ( ctx, repo, derivers, heads. as_slice ( ) ) . await ?;
129-
130- if skiplist_index. indexed_node_count ( ) == 0 {
131- // This skiplist index is not populated. Generate a single
132- // slice with all heads.
133- info ! (
134- ctx. logger( ) ,
135- "Repository not sliced as skiplist index is not populated" ,
136- ) ;
137- let heads = heads. into_iter ( ) . collect ( ) ;
138- return Ok ( ( 1 , vec ! [ ( 0 , heads) ] . into_iter ( ) . rev ( ) ) ) ;
139- }
140-
141- // Add any unindexed heads to the skiplist index.
142- let changeset_fetcher = repo. changeset_fetcher_arc ( ) ;
143- for head in heads. iter ( ) {
144- skiplist_index
145- . add_node ( ctx, & changeset_fetcher, * head, std:: u64:: MAX )
146- . await ?;
147- }
148-
149- let mut head_generation_groups: BTreeMap < u64 , Vec < ChangesetId > > = BTreeMap :: new ( ) ;
150- stream:: iter ( heads. into_iter ( ) . map ( |csid| async move {
151- let gen = repo
152- . changeset_fetcher ( )
153- . get_generation_number ( ctx, csid)
154- . await ?;
155- Ok ( Some ( ( csid, gen) ) )
156- } ) )
157- . buffered ( 100 )
158- . try_for_each ( |maybe_csid_gen| {
159- if let Some ( ( csid, gen) ) = maybe_csid_gen {
160- let gen_group = ( gen. value ( ) / slice_size) * slice_size;
161- head_generation_groups
162- . entry ( gen_group)
163- . or_default ( )
164- . push ( csid) ;
165- }
166- async { Ok :: < _ , Error > ( ( ) ) }
167- } )
168- . await ?;
169-
170- let mut slices = Vec :: new ( ) ;
171- while let Some ( ( cur_gen, mut heads) ) = head_generation_groups. pop_last ( ) {
172- info ! (
173- ctx. logger( ) ,
174- "Adding slice starting at generation {} with {} heads ({} slices queued)" ,
175- cur_gen,
176- heads. len( ) ,
177- head_generation_groups. len( )
178- ) ;
179- let mut new_heads_groups = HashMap :: new ( ) ;
180- let mut seen: HashSet < _ > = heads. iter ( ) . cloned ( ) . collect ( ) ;
181- while let Some ( csid) = heads. pop ( ) {
182- let skip_parents = match skiplist_index. get_furthest_edges ( csid) {
183- Some ( skip_parents) => skip_parents,
184- None => {
185- // Ordinarily this shouldn't happen, as the skiplist ought
186- // to refer to commits that are also in the skiplist.
187- // However, if the commit is missing from the skiplist, we
188- // can look up the parents and their generations directly.
189- parents_with_generations ( ctx, repo, csid) . await ?
190- }
191- } ;
192-
193- for ( parent, gen) in skip_parents {
194- if gen. value ( ) >= cur_gen {
195- // This commit is in the same generation group.
196- if seen. insert ( parent) {
197- heads. push ( parent) ;
198- }
199- } else {
200- // This commit is in a new generation group.
201- let gen_group = ( gen. value ( ) / slice_size) * slice_size;
202- new_heads_groups. insert ( parent, gen_group) ;
203- }
204- }
205- }
206-
207- // Add all commits we've seen to the slice. The heads from the start
208- // of this iteration would be sufficient, however providing additional
209- // changesets will allow traversal of the graph to find all commits to
210- // run faster as it can fetch the parents of multiple commits at once.
211- slices. push ( ( cur_gen, seen. into_iter ( ) . collect ( ) ) ) ;
212-
213- // For each new head, check if it needs derivation, and if so, add it
214- // to its generation group.
215- let new_heads: Vec < _ > = new_heads_groups. keys ( ) . cloned ( ) . collect ( ) ;
216- let underived_new_heads =
217- underived_heads ( ctx, repo, derivers, new_heads. as_slice ( ) ) . await ?;
218- for head in underived_new_heads {
219- if let Some ( gen_group) = new_heads_groups. get ( & head) {
220- head_generation_groups
221- . entry ( * gen_group)
222- . or_default ( )
223- . push ( head) ;
224- }
225- }
226- }
227-
228- if !slices. is_empty ( ) {
229- info ! (
230- ctx. logger( ) ,
231- "Repository sliced into {} slices requiring derivation" ,
232- slices. len( )
233- ) ;
234- }
235-
236- Ok ( ( slices. len ( ) , slices. into_iter ( ) . rev ( ) ) )
97+ ) -> Result < Vec < ( u64 , Vec < ChangesetId > ) > > {
98+ slice_repository:: slice_repository (
99+ ctx,
100+ repo,
101+ skiplist_index,
102+ heads,
103+ |heads| async move { underived_heads ( ctx, repo, derivers, heads. as_slice ( ) ) . await } ,
104+ slice_size,
105+ )
106+ . await
237107}
0 commit comments