Skip to content

Commit c0389ea

Browse files
committed
[ENH]: add operator for garbage collection to list all files for collection version
1 parent dd8eba1 commit c0389ea

File tree

3 files changed

+154
-0
lines changed

3 files changed

+154
-0
lines changed

rust/blockstore/src/arrow/provider.rs

+9
Original file line numberDiff line numberDiff line change
@@ -469,6 +469,15 @@ pub struct RootManager {
469469
prefetched_roots: Arc<parking_lot::Mutex<HashMap<Uuid, Duration>>>,
470470
}
471471

472+
impl std::fmt::Debug for RootManager {
473+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
474+
f.debug_struct("RootManager")
475+
.field("cache", &self.cache)
476+
.field("storage", &self.storage)
477+
.finish()
478+
}
479+
}
480+
472481
impl RootManager {
473482
pub fn new(storage: Storage, cache: Box<dyn PersistentCache<Uuid, RootReader>>) -> Self {
474483
let cache: Arc<dyn PersistentCache<Uuid, RootReader>> = cache.into();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
use async_trait::async_trait;
2+
use chroma_blockstore::{arrow::provider::RootManagerError, RootManager};
3+
use chroma_system::{Operator, OperatorType};
4+
use chroma_types::{chroma_proto::CollectionVersionFile, CollectionUuid, HNSW_PATH};
5+
use std::{collections::HashSet, str::FromStr};
6+
use thiserror::Error;
7+
use tokio::task::{JoinError, JoinSet};
8+
use uuid::Uuid;
9+
10+
#[derive(Debug)]
11+
pub struct ListFilesAtVersionInput {
12+
root_manager: RootManager,
13+
version_file: CollectionVersionFile,
14+
version: i64,
15+
}
16+
17+
impl ListFilesAtVersionInput {
18+
pub fn new(
19+
root_manager: RootManager,
20+
version_file: CollectionVersionFile,
21+
version: i64,
22+
) -> Self {
23+
Self {
24+
root_manager,
25+
version_file,
26+
version,
27+
}
28+
}
29+
}
30+
31+
#[derive(Debug)]
32+
pub struct ListFilesAtVersionOutput {
33+
pub collection_id: CollectionUuid,
34+
pub version: i64,
35+
pub file_paths: HashSet<String>,
36+
}
37+
38+
#[derive(Debug, Error)]
39+
pub enum ListFilesAtVersionsError {
40+
#[error("Version history field missing")]
41+
VersionHistoryMissing,
42+
#[error("Version {0} not found")]
43+
VersionNotFound(i64),
44+
#[error("Invalid UUID: {0}")]
45+
InvalidUuid(uuid::Error),
46+
#[error("Sparse index fetch task failed: {0}")]
47+
SparseIndexTaskFailed(JoinError),
48+
#[error("Error fetching block IDs for sparse index: {0}")]
49+
FetchBlockIdsError(#[from] RootManagerError),
50+
#[error("Version file missing collection ID")]
51+
VersionFileMissingCollectionId,
52+
}
53+
54+
#[derive(Clone, Debug)]
55+
pub struct ListFilesAtVersionsOperator {}
56+
57+
#[async_trait]
58+
impl Operator<ListFilesAtVersionInput, ListFilesAtVersionOutput> for ListFilesAtVersionsOperator {
59+
type Error = ListFilesAtVersionsError;
60+
61+
fn get_type(&self) -> OperatorType {
62+
OperatorType::IO
63+
}
64+
65+
async fn run(
66+
&self,
67+
input: &ListFilesAtVersionInput,
68+
) -> Result<ListFilesAtVersionOutput, Self::Error> {
69+
let version_history = input
70+
.version_file
71+
.version_history
72+
.as_ref()
73+
.ok_or_else(|| ListFilesAtVersionsError::VersionHistoryMissing)?;
74+
75+
let mut file_paths = HashSet::new();
76+
let mut sparse_index_ids = HashSet::new();
77+
78+
let version = version_history
79+
.versions
80+
.iter()
81+
.find(|v| v.version == input.version)
82+
.ok_or_else(|| ListFilesAtVersionsError::VersionNotFound(input.version))?;
83+
84+
if let Some(segment_info) = &version.segment_info {
85+
for segment in &segment_info.segment_compaction_info {
86+
for (file_type, segment_paths) in &segment.file_paths {
87+
if file_type == "hnsw_index" || file_type == HNSW_PATH {
88+
for path in &segment_paths.paths {
89+
for hnsw_file in [
90+
"header.bin",
91+
"data_level0.bin",
92+
"length.bin",
93+
"link_lists.bin",
94+
] {
95+
file_paths.insert(format!("hnsw/{}/{}", path, hnsw_file));
96+
}
97+
}
98+
} else {
99+
// Must be a sparse index
100+
for path in &segment_paths.paths {
101+
file_paths.insert(format!("sparse_index/{}", path));
102+
103+
let sparse_index_id = Uuid::parse_str(path)
104+
.map_err(ListFilesAtVersionsError::InvalidUuid)?;
105+
106+
sparse_index_ids.insert(sparse_index_id);
107+
}
108+
}
109+
}
110+
}
111+
}
112+
113+
let mut block_id_tasks = JoinSet::new();
114+
for sparse_index_id in sparse_index_ids {
115+
let root_manager = input.root_manager.clone();
116+
block_id_tasks
117+
.spawn(async move { root_manager.get_all_block_ids(&sparse_index_id).await });
118+
}
119+
120+
while let Some(res) = block_id_tasks.join_next().await {
121+
let block_ids = res
122+
.map_err(ListFilesAtVersionsError::SparseIndexTaskFailed)?
123+
.map_err(ListFilesAtVersionsError::FetchBlockIdsError)?;
124+
125+
for block_id in block_ids {
126+
file_paths.insert(format!("block/{}", block_id));
127+
}
128+
}
129+
130+
Ok(ListFilesAtVersionOutput {
131+
collection_id: CollectionUuid::from_str(
132+
&input
133+
.version_file
134+
.collection_info_immutable
135+
.as_ref()
136+
.ok_or_else(|| ListFilesAtVersionsError::VersionFileMissingCollectionId)?
137+
.collection_id,
138+
)
139+
.map_err(ListFilesAtVersionsError::InvalidUuid)?,
140+
version: input.version,
141+
file_paths,
142+
})
143+
}
144+
}

rust/garbage_collector/src/operators/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,5 @@ pub mod fetch_lineage_file;
88
pub mod fetch_sparse_index_files;
99
pub mod fetch_version_file;
1010
pub mod get_version_file_paths;
11+
pub mod list_files_at_version;
1112
pub mod mark_versions_at_sysdb;

0 commit comments

Comments
 (0)