Skip to content

Commit d357155

Browse files
committed
[ENH]: add operator for garbage collection to list all files for collection version
1 parent 8fa48f8 commit d357155

File tree

3 files changed

+167
-0
lines changed

3 files changed

+167
-0
lines changed

rust/blockstore/src/arrow/provider.rs

+9
Original file line numberDiff line numberDiff line change
@@ -469,6 +469,15 @@ pub struct RootManager {
469469
prefetched_roots: Arc<parking_lot::Mutex<HashMap<Uuid, Duration>>>,
470470
}
471471

472+
impl std::fmt::Debug for RootManager {
473+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
474+
f.debug_struct("RootManager")
475+
.field("cache", &self.cache)
476+
.field("storage", &self.storage)
477+
.finish()
478+
}
479+
}
480+
472481
impl RootManager {
473482
pub fn new(storage: Storage, cache: Box<dyn PersistentCache<Uuid, RootReader>>) -> Self {
474483
let cache: Arc<dyn PersistentCache<Uuid, RootReader>> = cache.into();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
use async_trait::async_trait;
2+
use chroma_blockstore::{arrow::provider::RootManagerError, RootManager};
3+
use chroma_system::{Operator, OperatorType};
4+
use chroma_types::{chroma_proto::CollectionVersionFile, CollectionUuid, HNSW_PATH};
5+
use std::{collections::HashSet, str::FromStr};
6+
use thiserror::Error;
7+
use tokio::task::{JoinError, JoinSet};
8+
use uuid::Uuid;
9+
10+
#[derive(Debug)]
11+
pub struct ListFilesAtVersionInput {
12+
root_manager: RootManager,
13+
version_file: CollectionVersionFile,
14+
version: i64,
15+
}
16+
17+
impl ListFilesAtVersionInput {
18+
pub fn new(
19+
root_manager: RootManager,
20+
version_file: CollectionVersionFile,
21+
version: i64,
22+
) -> Self {
23+
Self {
24+
root_manager,
25+
version_file,
26+
version,
27+
}
28+
}
29+
}
30+
31+
#[derive(Debug)]
32+
pub struct ListFilesAtVersionOutput {
33+
pub collection_id: CollectionUuid,
34+
pub version: i64,
35+
pub file_paths: HashSet<String>,
36+
}
37+
38+
#[derive(Debug, Error)]
39+
pub enum ListFilesAtVersionError {
40+
#[error("Version history field missing")]
41+
VersionHistoryMissing,
42+
#[error("Version {0} not found")]
43+
VersionNotFound(i64),
44+
#[error("Invalid UUID: {0}")]
45+
InvalidUuid(uuid::Error),
46+
#[error("Sparse index fetch task failed: {0}")]
47+
SparseIndexTaskFailed(JoinError),
48+
#[error("Error fetching block IDs for sparse index: {0}")]
49+
FetchBlockIdsError(#[from] RootManagerError),
50+
#[error("Version file missing collection ID")]
51+
VersionFileMissingCollectionId,
52+
}
53+
54+
#[derive(Clone, Debug)]
55+
pub struct ListFilesAtVersionsOperator {}
56+
57+
#[async_trait]
58+
impl Operator<ListFilesAtVersionInput, ListFilesAtVersionOutput> for ListFilesAtVersionsOperator {
59+
type Error = ListFilesAtVersionError;
60+
61+
fn get_type(&self) -> OperatorType {
62+
OperatorType::IO
63+
}
64+
65+
async fn run(
66+
&self,
67+
input: &ListFilesAtVersionInput,
68+
) -> Result<ListFilesAtVersionOutput, Self::Error> {
69+
let collection_id = CollectionUuid::from_str(
70+
&input
71+
.version_file
72+
.collection_info_immutable
73+
.as_ref()
74+
.ok_or_else(|| ListFilesAtVersionError::VersionFileMissingCollectionId)?
75+
.collection_id,
76+
)
77+
.map_err(ListFilesAtVersionError::InvalidUuid)?;
78+
79+
let version_history = input
80+
.version_file
81+
.version_history
82+
.as_ref()
83+
.ok_or_else(|| ListFilesAtVersionError::VersionHistoryMissing)?;
84+
85+
let mut file_paths = HashSet::new();
86+
let mut sparse_index_ids = HashSet::new();
87+
88+
let version = version_history
89+
.versions
90+
.iter()
91+
.find(|v| v.version == input.version)
92+
.ok_or_else(|| ListFilesAtVersionError::VersionNotFound(input.version))?;
93+
94+
tracing::debug!(
95+
"Listing files at version {} for collection {}.",
96+
version.version,
97+
collection_id,
98+
);
99+
tracing::trace!(
100+
"Processing version {:#?} for collection {}",
101+
version,
102+
collection_id
103+
);
104+
105+
if let Some(segment_info) = &version.segment_info {
106+
for segment in &segment_info.segment_compaction_info {
107+
for (file_type, segment_paths) in &segment.file_paths {
108+
if file_type == "hnsw_index" || file_type == HNSW_PATH {
109+
for path in &segment_paths.paths {
110+
for hnsw_file in [
111+
"header.bin",
112+
"data_level0.bin",
113+
"length.bin",
114+
"link_lists.bin",
115+
] {
116+
file_paths.insert(format!("hnsw/{}/{}", path, hnsw_file));
117+
}
118+
}
119+
} else {
120+
// Must be a sparse index
121+
for path in &segment_paths.paths {
122+
file_paths.insert(format!("sparse_index/{}", path));
123+
124+
let sparse_index_id = Uuid::parse_str(path)
125+
.map_err(ListFilesAtVersionError::InvalidUuid)?;
126+
127+
sparse_index_ids.insert(sparse_index_id);
128+
}
129+
}
130+
}
131+
}
132+
}
133+
134+
let mut block_id_tasks = JoinSet::new();
135+
for sparse_index_id in sparse_index_ids {
136+
let root_manager = input.root_manager.clone();
137+
block_id_tasks
138+
.spawn(async move { root_manager.get_all_block_ids(&sparse_index_id).await });
139+
}
140+
141+
while let Some(res) = block_id_tasks.join_next().await {
142+
let block_ids = res
143+
.map_err(ListFilesAtVersionError::SparseIndexTaskFailed)?
144+
.map_err(ListFilesAtVersionError::FetchBlockIdsError)?;
145+
146+
for block_id in block_ids {
147+
file_paths.insert(format!("block/{}", block_id));
148+
}
149+
}
150+
151+
Ok(ListFilesAtVersionOutput {
152+
collection_id,
153+
version: input.version,
154+
file_paths,
155+
})
156+
}
157+
}

rust/garbage_collector/src/operators/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,5 @@ pub mod fetch_lineage_file;
88
pub mod fetch_sparse_index_files;
99
pub mod fetch_version_file;
1010
pub mod get_version_file_paths;
11+
pub mod list_files_at_version;
1112
pub mod mark_versions_at_sysdb;

0 commit comments

Comments
 (0)