1- use futures:: { TryStream , TryStreamExt as _ , future:: ready, stream} ;
1+ use futures:: { TryStreamExt , future:: ready, stream} ;
22use std:: {
33 collections:: HashSet ,
4- num:: NonZeroU16 ,
4+ num:: { NonZeroU16 , NonZeroUsize } ,
55 sync:: { Arc , Mutex } ,
66} ;
7+ use tracing:: trace;
78
89use crate :: {
910 Storage ,
1011 asset_manager:: AssetManager ,
11- format:: { ChunkId , ManifestId , manifest:: ChunkPayload } ,
12+ format:: {
13+ ChunkId ,
14+ manifest:: { ChunkPayload , Manifest } ,
15+ snapshot:: ManifestFileInfo ,
16+ } ,
1217 ops:: pointed_snapshots,
13- repository:: { RepositoryErrorKind , RepositoryResult } ,
18+ repository:: { RepositoryError , RepositoryErrorKind , RepositoryResult } ,
1419 storage,
20+ stream_utils:: { StreamLimiter , try_unique_stream} ,
1521} ;
1622
17- async fn manifest_chunks_storage (
18- manifest_id : ManifestId ,
19- manifest_size : u64 ,
20- asset_manager : Arc < AssetManager > ,
23+ fn calculate_manifest_storage (
24+ manifest : Arc < Manifest > ,
2125 seen_chunks : Arc < Mutex < HashSet < ChunkId > > > ,
2226) -> RepositoryResult < u64 > {
23- let manifest = asset_manager . fetch_manifest ( & manifest_id , manifest_size ) . await ? ;
27+ trace ! ( manifest_id = %manifest . id ( ) , "Processing manifest" ) ;
2428 let mut size = 0 ;
2529 for payload in manifest. chunk_payloads ( ) {
2630 match payload {
@@ -47,36 +51,18 @@ async fn manifest_chunks_storage(
4751 }
4852 }
4953 }
54+ trace ! ( manifest_id = %manifest. id( ) , "Manifest done" ) ;
5055 Ok ( size)
5156}
5257
53- pub fn try_unique_stream < S , T , E , F , V > (
54- f : F ,
55- stream : S ,
56- ) -> impl TryStream < Ok = T , Error = E >
57- where
58- F : Fn ( & S :: Ok ) -> V ,
59- S : TryStream < Ok = T , Error = E > ,
60- V : Eq + std:: hash:: Hash ,
61- {
62- let mut seen = HashSet :: new ( ) ;
63- stream. try_filter ( move |item| {
64- let v = f ( item) ;
65- if seen. insert ( v) {
66- futures:: future:: ready ( true )
67- } else {
68- futures:: future:: ready ( false )
69- }
70- } )
71- }
72-
7358/// Compute the total size in bytes of all committed repo chunks.
7459/// It doesn't include inline or virtual chunks.
7560pub async fn repo_chunks_storage (
7661 storage : & ( dyn Storage + Send + Sync ) ,
7762 storage_settings : & storage:: Settings ,
7863 asset_manager : Arc < AssetManager > ,
79- process_manifests_concurrently : NonZeroU16 ,
64+ max_manifest_mem_bytes : NonZeroUsize ,
65+ max_concurrent_manifest_fetches : NonZeroU16 ,
8066) -> RepositoryResult < u64 > {
8167 let extra_roots = Default :: default ( ) ;
8268 let all_snaps = pointed_snapshots (
@@ -90,27 +76,59 @@ pub async fn repo_chunks_storage(
9076 let all_manifest_infos = all_snaps
9177 // this could be slightly optimized by not collecting all manifest info records into a vec
9278 // but we don't expect too many, and they are small anyway
93- . map_ok ( |snap| stream:: iter ( snap. manifest_files ( ) . map ( Ok ) . collect :: < Vec < _ > > ( ) ) )
79+ . map_ok ( |snap| {
80+ stream:: iter (
81+ snap. manifest_files ( ) . map ( Ok :: < _ , RepositoryError > ) . collect :: < Vec < _ > > ( ) ,
82+ )
83+ } )
9484 . try_flatten ( ) ;
85+
86+ // we don't want to check manifests more than once, so we unique them by their id
9587 let unique_manifest_infos = try_unique_stream ( |mi| mi. id . clone ( ) , all_manifest_infos) ;
9688
89+ // we want to fetch many manifests in parallel, but not more than memory allows
90+ // for this we use the StreamLimiter using the manifest size in bytes for usage
91+ let limiter = & Arc :: new ( StreamLimiter :: new (
92+ max_manifest_mem_bytes. get ( ) ,
93+ |m : & ManifestFileInfo | m. size_bytes as usize ,
94+ ) ) ;
95+
96+ // The StreamLimiter works by calling limit on every element before they are processed
97+ let rate_limited_manifests = unique_manifest_infos
98+ . and_then ( |m| async move { Ok ( limiter. clone ( ) . limit ( m) . await ) } ) ;
99+
97100 let seen_chunks = & Arc :: new ( Mutex :: new ( HashSet :: new ( ) ) ) ;
98101 let asset_manager = & asset_manager;
99102
100- let res = unique_manifest_infos
101- . map_ok ( |manifest_info| async move {
102- let manifest_size = manifest_info. size_bytes ;
103- manifest_chunks_storage (
104- manifest_info. id ,
105- manifest_size,
106- Arc :: clone ( asset_manager) ,
107- Arc :: clone ( seen_chunks) ,
108- )
109- . await
103+ let ( _, res) = rate_limited_manifests
104+ . map_ok ( |m| async move {
105+ let manifest =
106+ Arc :: clone ( asset_manager) . fetch_manifest ( & m. id , m. size_bytes ) . await ?;
107+ Ok ( ( manifest, m) )
108+ } )
109+ // Now we can buffer a bunch of fetch_manifest operations. Because we are using
110+ // StreamLimiter we know memory is not going to blow up
111+ . try_buffer_unordered ( max_concurrent_manifest_fetches. get ( ) as usize )
112+ . map_ok ( |( manifest, minfo) | async move {
113+ let size = calculate_manifest_storage ( manifest, Arc :: clone ( seen_chunks) ) ?;
114+ Ok ( ( size, minfo) )
115+ } )
116+ // We do some more buffering to get some concurrency on the processing of the manifest file
117+ // TODO: this should actually happen in a CPU bounded worker pool
118+ . try_buffer_unordered ( 4 )
119+ // Now StreamLimiter requires us to call free, this will make room for more manifests to be
120+ // fetched into the previous buffer
121+ . and_then ( |( size, minfo) | async move {
122+ limiter. clone ( ) . free ( minfo) . await ;
123+ Ok ( size)
124+ } )
125+ . try_fold ( ( 0u64 , 0 ) , |( processed, total_size) , partial| {
126+ //info!("Processed {processed} manifests");
127+ ready ( Ok ( ( processed + 1 , total_size + partial) ) )
110128 } )
111- . try_buffered ( process_manifests_concurrently. get ( ) as usize )
112- . try_fold ( 0 , |total, partial| ready ( Ok ( total + partial) ) )
113129 . await ?;
114130
131+ debug_assert_eq ! ( limiter. current_usage( ) . await , ( 0 , 0 ) ) ;
132+
115133 Ok ( res)
116134}
0 commit comments