Skip to content

Commit 1c3b67d

Browse files
committed
Improve performance of size stats
This introduces two limits to the concurrency: * max memory keeping for in flight manifests * max number of concurrent manifest fetches With this, we don't need to limit the concurrency to a specific number of manifests, but now we can fetch more manifests if they are small. Combining these two limits, we get an important performance optimization, particularly for repos with a large number of smallish manifests. I used this script to test and observe the performance of this: ```python import arraylake import time import icechunk icechunk.set_logs_filter("icechunk::stream_utils=trace") token = '...' client = arraylake.Client(token=token) repo = client.get_repo("earthmover-public/era5-surface-aws") start_time = time.time() repo.total_chunks_storage(None, None) print(" %s seconds" % (time.time() - start_time)) ``` At the time of running this, ERA5 took around 4 seconds to compute. Testing with real world repos with small manifests shows performance improvements of the order of 30x.
1 parent d6d93d0 commit 1c3b67d

File tree

8 files changed

+221
-58
lines changed

8 files changed

+221
-58
lines changed

icechunk-python/python/icechunk/_icechunk_python.pyi

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1357,7 +1357,9 @@ class PyRepository:
13571357
self, message: str, *, branch: str, metadata: dict[str, Any] | None = None
13581358
) -> str: ...
13591359
def total_chunks_storage(
1360-
self, process_manifests_concurrently: int | None = None
1360+
self,
1361+
max_manifest_mem_bytes: int | None = None,
1362+
max_concurrent_manifest_fetches: int | None = None,
13611363
) -> int: ...
13621364

13631365
class PySession:

icechunk-python/python/icechunk/repository.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -729,7 +729,9 @@ def rewrite_manifests(
729729
)
730730

731731
def total_chunks_storage(
732-
self, process_manifests_concurrently: int | None = None
732+
self,
733+
max_manifest_mem_bytes: int | None = None,
734+
max_concurrent_manifest_fetches: int | None = None,
733735
) -> int:
734736
"""Calculate the total storage used for chunks, in bytes .
735737
@@ -743,8 +745,12 @@ def total_chunks_storage(
743745
744746
Parameters
745747
----------
746-
process_manifests_concurrently : int | None
747-
Process this many manifests concurrently. Defaults to 10.
748+
max_manifest_mem_bytes : int | None
749+
Don't use more than this memory to store in-flight manifests. Defaults to 512 MB.
750+
max_concurrent_manifest_fetches : int | None
751+
Don't run more than this many concurrent manifest fetches. Defaults to 500.
748752
"""
749753

750-
return self._repository.total_chunks_storage(process_manifests_concurrently)
754+
return self._repository.total_chunks_storage(
755+
max_manifest_mem_bytes, max_concurrent_manifest_fetches
756+
)

icechunk-python/src/repository.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::{
22
borrow::Cow,
33
collections::{BTreeMap, BTreeSet, HashMap, HashSet},
4-
num::NonZeroU16,
4+
num::{NonZeroU16, NonZeroUsize},
55
sync::Arc,
66
};
77

@@ -1053,7 +1053,8 @@ impl PyRepository {
10531053
pub fn total_chunks_storage(
10541054
&self,
10551055
py: Python<'_>,
1056-
process_manifests_concurrently: Option<NonZeroU16>,
1056+
max_manifest_mem_bytes: Option<NonZeroUsize>,
1057+
max_concurrent_manifest_fetches: Option<NonZeroU16>,
10571058
) -> PyResult<u64> {
10581059
// This function calls block_on, so we need to allow other thread python to make progress
10591060
py.allow_threads(move || {
@@ -1071,8 +1072,12 @@ impl PyRepository {
10711072
storage.as_ref(),
10721073
&storage_settings,
10731074
asset_manager,
1074-
process_manifests_concurrently.unwrap_or(
1075-
NonZeroU16::try_from(10).unwrap_or(NonZeroU16::MIN),
1075+
max_manifest_mem_bytes.unwrap_or(
1076+
NonZeroUsize::try_from(512 * 1020 * 1024)
1077+
.unwrap_or(NonZeroUsize::MIN),
1078+
),
1079+
max_concurrent_manifest_fetches.unwrap_or(
1080+
NonZeroU16::try_from(500).unwrap_or(NonZeroU16::MIN),
10761081
),
10771082
)
10781083
.await

icechunk/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ pub mod storage;
3232
pub mod store;
3333
#[cfg(test)]
3434
pub mod strategies;
35+
mod stream_utils;
3536
pub mod virtual_chunks;
3637

3738
pub use config::{ObjectStoreConfig, RepositoryConfig};

icechunk/src/ops/gc.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use crate::{
1212
ChunkId, IcechunkFormatError, IcechunkFormatErrorKind, ManifestId, SnapshotId,
1313
manifest::ChunkPayload,
1414
},
15-
ops::pointed_snapshot_ids,
15+
ops::pointed_snapshots,
1616
refs::{Ref, RefError, delete_branch, delete_tag, list_refs},
1717
repository::{RepositoryError, RepositoryErrorKind},
1818
storage::{self, DeleteObjectsResult, ListInfo},
@@ -174,23 +174,22 @@ pub async fn garbage_collect(
174174
}
175175

176176
tracing::info!("Finding GC roots");
177-
let all_snaps = pointed_snapshot_ids(
177+
let all_snaps = pointed_snapshots(
178178
storage,
179179
storage_settings,
180180
Arc::clone(&asset_manager),
181181
&config.extra_roots,
182182
)
183183
.await?;
184184

185-
// FIXME: add attribute files
186185
let mut keep_chunks = HashSet::new();
187186
let mut keep_manifests = HashSet::new();
188187
let mut keep_snapshots = HashSet::new();
189188

190189
tracing::info!("Calculating retained objects");
191190
pin!(all_snaps);
192-
while let Some(snap_id) = all_snaps.try_next().await? {
193-
let snap = asset_manager.fetch_snapshot(&snap_id).await?;
191+
while let Some(snap) = all_snaps.try_next().await? {
192+
let snap_id = snap.id();
194193
if config.deletes_snapshots() && keep_snapshots.insert(snap_id.clone()) {
195194
tracing::trace!("Adding snapshot to keep list: {}", &snap_id);
196195
}
@@ -240,6 +239,8 @@ pub async fn garbage_collect(
240239

241240
let mut summary = GCSummary::default();
242241

242+
tracing::info!("Starting deletes");
243+
243244
if config.deletes_snapshots() {
244245
let res = gc_snapshots(
245246
asset_manager.as_ref(),

icechunk/src/ops/stats.rs

Lines changed: 61 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,30 @@
1-
use futures::{TryStream, TryStreamExt as _, future::ready, stream};
1+
use futures::{TryStreamExt, future::ready, stream};
22
use std::{
33
collections::HashSet,
4-
num::NonZeroU16,
4+
num::{NonZeroU16, NonZeroUsize},
55
sync::{Arc, Mutex},
66
};
7+
use tracing::trace;
78

89
use crate::{
910
Storage,
1011
asset_manager::AssetManager,
11-
format::{ChunkId, ManifestId, manifest::ChunkPayload},
12+
format::{
13+
ChunkId,
14+
manifest::{ChunkPayload, Manifest},
15+
snapshot::ManifestFileInfo,
16+
},
1217
ops::pointed_snapshots,
13-
repository::{RepositoryErrorKind, RepositoryResult},
18+
repository::{RepositoryError, RepositoryErrorKind, RepositoryResult},
1419
storage,
20+
stream_utils::{StreamLimiter, try_unique_stream},
1521
};
1622

17-
async fn manifest_chunks_storage(
18-
manifest_id: ManifestId,
19-
manifest_size: u64,
20-
asset_manager: Arc<AssetManager>,
23+
fn calculate_manifest_storage(
24+
manifest: Arc<Manifest>,
2125
seen_chunks: Arc<Mutex<HashSet<ChunkId>>>,
2226
) -> RepositoryResult<u64> {
23-
let manifest = asset_manager.fetch_manifest(&manifest_id, manifest_size).await?;
27+
trace!(manifest_id = %manifest.id(), "Processing manifest");
2428
let mut size = 0;
2529
for payload in manifest.chunk_payloads() {
2630
match payload {
@@ -47,36 +51,18 @@ async fn manifest_chunks_storage(
4751
}
4852
}
4953
}
54+
trace!(manifest_id = %manifest.id(), "Manifest done");
5055
Ok(size)
5156
}
5257

53-
pub fn try_unique_stream<S, T, E, F, V>(
54-
f: F,
55-
stream: S,
56-
) -> impl TryStream<Ok = T, Error = E>
57-
where
58-
F: Fn(&S::Ok) -> V,
59-
S: TryStream<Ok = T, Error = E>,
60-
V: Eq + std::hash::Hash,
61-
{
62-
let mut seen = HashSet::new();
63-
stream.try_filter(move |item| {
64-
let v = f(item);
65-
if seen.insert(v) {
66-
futures::future::ready(true)
67-
} else {
68-
futures::future::ready(false)
69-
}
70-
})
71-
}
72-
7358
/// Compute the total size in bytes of all committed repo chunks.
7459
/// It doesn't include inline or virtual chunks.
7560
pub async fn repo_chunks_storage(
7661
storage: &(dyn Storage + Send + Sync),
7762
storage_settings: &storage::Settings,
7863
asset_manager: Arc<AssetManager>,
79-
process_manifests_concurrently: NonZeroU16,
64+
max_manifest_mem_bytes: NonZeroUsize,
65+
max_concurrent_manifest_fetches: NonZeroU16,
8066
) -> RepositoryResult<u64> {
8167
let extra_roots = Default::default();
8268
let all_snaps = pointed_snapshots(
@@ -90,27 +76,59 @@ pub async fn repo_chunks_storage(
9076
let all_manifest_infos = all_snaps
9177
// this could be slightly optimized by not collecting all manifest info records into a vec
9278
// but we don't expect too many, and they are small anyway
93-
.map_ok(|snap| stream::iter(snap.manifest_files().map(Ok).collect::<Vec<_>>()))
79+
.map_ok(|snap| {
80+
stream::iter(
81+
snap.manifest_files().map(Ok::<_, RepositoryError>).collect::<Vec<_>>(),
82+
)
83+
})
9484
.try_flatten();
85+
86+
// we don't want to check manifests more than once, so we unique them by their id
9587
let unique_manifest_infos = try_unique_stream(|mi| mi.id.clone(), all_manifest_infos);
9688

89+
// we want to fetch many manifests in parallel, but not more than memory allows
90+
// for this we use the StreamLimiter using the manifest size in bytes for usage
91+
let limiter = &Arc::new(StreamLimiter::new(
92+
max_manifest_mem_bytes.get(),
93+
|m: &ManifestFileInfo| m.size_bytes as usize,
94+
));
95+
96+
// The StreamLimiter works by calling limit on every element before they are processed
97+
let rate_limited_manifests = unique_manifest_infos
98+
.and_then(|m| async move { Ok(limiter.clone().limit(m).await) });
99+
97100
let seen_chunks = &Arc::new(Mutex::new(HashSet::new()));
98101
let asset_manager = &asset_manager;
99102

100-
let res = unique_manifest_infos
101-
.map_ok(|manifest_info| async move {
102-
let manifest_size = manifest_info.size_bytes;
103-
manifest_chunks_storage(
104-
manifest_info.id,
105-
manifest_size,
106-
Arc::clone(asset_manager),
107-
Arc::clone(seen_chunks),
108-
)
109-
.await
103+
let (_, res) = rate_limited_manifests
104+
.map_ok(|m| async move {
105+
let manifest =
106+
Arc::clone(asset_manager).fetch_manifest(&m.id, m.size_bytes).await?;
107+
Ok((manifest, m))
108+
})
109+
// Now we can buffer a bunch of fetch_manifest operations. Because we are using
110+
// StreamLimiter we know memory is not going to blow up
111+
.try_buffer_unordered(max_concurrent_manifest_fetches.get() as usize)
112+
.map_ok(|(manifest, minfo)| async move {
113+
let size = calculate_manifest_storage(manifest, Arc::clone(seen_chunks))?;
114+
Ok((size, minfo))
115+
})
116+
// We do some more buffering to get some concurrency on the processing of the manifest file
117+
// TODO: this should actually happen in a CPU bounded worker pool
118+
.try_buffer_unordered(4)
119+
// Now StreamLimiter requires us to call free, this will make room for more manifests to be
120+
// fetched into the previous buffer
121+
.and_then(|(size, minfo)| async move {
122+
limiter.clone().free(minfo).await;
123+
Ok(size)
124+
})
125+
.try_fold((0u64, 0), |(processed, total_size), partial| {
126+
//info!("Processed {processed} manifests");
127+
ready(Ok((processed + 1, total_size + partial)))
110128
})
111-
.try_buffered(process_manifests_concurrently.get() as usize)
112-
.try_fold(0, |total, partial| ready(Ok(total + partial)))
113129
.await?;
114130

131+
debug_assert_eq!(limiter.current_usage().await, (0, 0));
132+
115133
Ok(res)
116134
}

0 commit comments

Comments
 (0)