1- use std:: { collections:: HashSet , future :: ready , sync:: Arc } ;
1+ use std:: { collections:: HashSet , sync:: Arc } ;
22
3+ use async_stream:: try_stream;
34use futures:: { Stream , StreamExt as _, TryStreamExt as _, stream} ;
5+ use tokio:: pin;
46use tracing:: instrument;
57
68use crate :: {
@@ -21,8 +23,6 @@ pub async fn all_roots<'a>(
2123 extra_roots : & ' a HashSet < SnapshotId > ,
2224) -> RefResult < impl Stream < Item = RefResult < SnapshotId > > + ' a > {
2325 let all_refs = list_refs ( storage, storage_settings) . await ?;
24- // TODO: this could be optimized by not following the ancestry of snapshots that we have
25- // already seen
2626 let roots = stream:: iter ( all_refs)
2727 . then ( move |r| async move {
2828 r. fetch ( storage, storage_settings) . await . map ( |ref_data| ref_data. snapshot )
@@ -39,21 +39,73 @@ pub async fn pointed_snapshots<'a>(
3939 asset_manager : Arc < AssetManager > ,
4040 extra_roots : & ' a HashSet < SnapshotId > ,
4141) -> RepositoryResult < impl Stream < Item = RepositoryResult < SnapshotId > > + ' a > {
42- let roots = all_roots ( storage, storage_settings, extra_roots)
43- . await ?
44- . err_into :: < RepositoryError > ( ) ;
45- Ok ( roots
46- . and_then ( move |snap_id| {
42+ let mut seen: HashSet < SnapshotId > = HashSet :: new ( ) ;
43+ let res = try_stream ! {
44+ let roots = all_roots( storage, storage_settings, extra_roots)
45+ . await ?
46+ . err_into:: <RepositoryError >( ) ;
47+ pin!( roots) ;
48+
49+ while let Some ( pointed_snap_id) = roots. try_next( ) . await ? {
4750 let asset_manager = Arc :: clone( & asset_manager. clone( ) ) ;
48- async move {
49- let snap = asset_manager. fetch_snapshot ( & snap_id) . await ?;
50- let parents = Arc :: clone ( & asset_manager)
51- . snapshot_ancestry ( & snap. id ( ) )
52- . await ?
53- . map_ok ( |parent| parent. id )
54- . err_into ( ) ;
55- Ok ( stream:: once ( ready ( Ok ( snap_id) ) ) . chain ( parents) )
51+ if ! seen. contains( & pointed_snap_id) {
52+ let parents = asset_manager. snapshot_ancestry( & pointed_snap_id) . await ?;
53+ for await parent in parents {
54+ let snap_id = parent?. id;
55+ if seen. insert( snap_id. clone( ) ) {
56+ // it's a new snapshot
57+ yield snap_id
58+ } else {
59+ // as soon as we find a repeated snapshot
60+ // there is no point in continuing to retrieve
61+ // the rest of the ancestry, it must be already
62+ // retrieved from other ref
63+ break
64+ }
65+ }
5666 }
57- } )
58- . try_flatten ( ) )
67+ }
68+ } ;
69+ Ok ( res)
70+ }
71+
72+ #[ cfg( test) ]
73+ #[ allow( clippy:: panic, clippy:: unwrap_used, clippy:: expect_used) ]
74+ mod tests {
75+ use futures:: TryStreamExt as _;
76+ use std:: collections:: { HashMap , HashSet } ;
77+
78+ use bytes:: Bytes ;
79+
80+ use crate :: {
81+ Repository , format:: Path , new_in_memory_storage, ops:: pointed_snapshots,
82+ } ;
83+
84+ #[ tokio:: test]
85+ async fn test_pointed_snapshots_duplicate ( ) -> Result < ( ) , Box < dyn std:: error:: Error > >
86+ {
87+ let storage = new_in_memory_storage ( ) . await ?;
88+ let repo = Repository :: create ( None , storage. clone ( ) , HashMap :: new ( ) ) . await ?;
89+ let mut session = repo. writable_session ( "main" ) . await ?;
90+ session. add_group ( Path :: root ( ) , Bytes :: new ( ) ) . await ?;
91+ let snap = session. commit ( "commit" , None ) . await ?;
92+ repo. create_tag ( "tag1" , & snap) . await ?;
93+ let mut session = repo. writable_session ( "main" ) . await ?;
94+ session. add_group ( "/foo" . try_into ( ) . unwrap ( ) , Bytes :: new ( ) ) . await ?;
95+ let snap = session. commit ( "commit" , None ) . await ?;
96+ repo. create_tag ( "tag2" , & snap) . await ?;
97+
98+ let all_snaps = pointed_snapshots (
99+ storage. as_ref ( ) ,
100+ & storage. default_settings ( ) ,
101+ repo. asset_manager ( ) . clone ( ) ,
102+ & HashSet :: new ( ) ,
103+ )
104+ . await ?
105+ . try_collect :: < Vec < _ > > ( )
106+ . await ?;
107+
108+ assert_eq ! ( all_snaps. len( ) , 3 ) ;
109+ Ok ( ( ) )
110+ }
59111}
0 commit comments