@@ -21,7 +21,9 @@ use chroma_system::{
2121 wrap, ChannelError , ComponentContext , ComponentHandle , Dispatcher , Handler , Orchestrator ,
2222 PanicError , TaskError , TaskMessage , TaskResult ,
2323} ;
24- use chroma_types:: { chroma_proto:: CollectionVersionFile , CollectionUuid } ;
24+ use chroma_types:: {
25+ chroma_proto:: CollectionVersionFile , BatchGetCollectionSoftDeleteStatusError , CollectionUuid ,
26+ } ;
2527use chrono:: DateTime ;
2628use petgraph:: { dot:: Dot , graph:: DiGraph } ;
2729use std:: {
@@ -125,13 +127,17 @@ pub enum ConstructVersionGraphError {
125127 FetchLineageFile ( #[ from] FetchLineageFileError ) ,
126128 #[ error( "Error fetching version file paths: {0}" ) ]
127129 FetchVersionFilePaths ( #[ from] GetVersionFilePathsError ) ,
130+ #[ error( "Could not get collection soft delete status: {0}" ) ]
131+ BatchGetCollectionSoftDeleteStatus ( #[ from] BatchGetCollectionSoftDeleteStatusError ) ,
128132
129133 #[ error( "Invalid UUID: {0}" ) ]
130134 InvalidUuid ( #[ from] uuid:: Error ) ,
131135 #[ error( "Invalid timestamp: {0}" ) ]
132136 InvalidTimestamp ( i64 ) ,
133- #[ error( "Expected node not found while constructing graph" ) ]
134- ExpectedNodeNotFound ,
137+ #[ error( "Expected node not found while constructing graph (collection {0}@v{1:?})" ) ]
138+ ExpectedNodeNotFound ( CollectionUuid , Option < i64 > ) ,
139+ #[ error( "Invariant violation: {0}" ) ]
140+ InvariantViolation ( String ) ,
135141}
136142
137143impl < E > From < TaskError < E > > for ConstructVersionGraphError
@@ -157,9 +163,11 @@ impl ChromaError for ConstructVersionGraphError {
157163 ConstructVersionGraphError :: FetchVersionFile ( err) => err. code ( ) ,
158164 ConstructVersionGraphError :: FetchLineageFile ( err) => err. code ( ) ,
159165 ConstructVersionGraphError :: FetchVersionFilePaths ( err) => err. code ( ) ,
166+ ConstructVersionGraphError :: BatchGetCollectionSoftDeleteStatus ( err) => err. code ( ) ,
160167 ConstructVersionGraphError :: InvalidUuid ( _) => ErrorCodes :: Internal ,
161168 ConstructVersionGraphError :: InvalidTimestamp ( _) => ErrorCodes :: InvalidArgument ,
162- ConstructVersionGraphError :: ExpectedNodeNotFound => ErrorCodes :: Internal ,
169+ ConstructVersionGraphError :: ExpectedNodeNotFound ( _, _) => ErrorCodes :: Internal ,
170+ ConstructVersionGraphError :: InvariantViolation ( _) => ErrorCodes :: Internal ,
163171 }
164172 }
165173}
@@ -215,9 +223,11 @@ impl ConstructVersionGraphOrchestrator {
215223 ctx : & ComponentContext < ConstructVersionGraphOrchestrator > ,
216224 ) -> Result < ( ) , ConstructVersionGraphError > {
217225 if self . num_pending_tasks == 0 {
226+ // This map will be used as a basis for building the graph
218227 let mut versions_by_collection_id: HashMap < CollectionUuid , Vec < ( i64 , VersionStatus ) > > =
219228 HashMap :: new ( ) ;
220229
230+ // Add all known versions from version files to map
221231 for ( collection_id, version_file) in self . version_files . iter ( ) {
222232 if let Some ( versions) = & version_file. version_history {
223233 for version in versions. versions . iter ( ) {
@@ -242,6 +252,7 @@ impl ConstructVersionGraphOrchestrator {
242252 }
243253 }
244254
255+ // If any version appears as a version dependency (from the lineage file) but does not already exist in the map from the version files, the version must have been deleted.
245256 for dependency in self . version_dependencies . iter ( ) {
246257 let source_collection_id = dependency. source_collection_id ;
247258 let source_collection_version = dependency. source_collection_version ;
@@ -263,6 +274,11 @@ impl ConstructVersionGraphOrchestrator {
263274 versions. sort_unstable_by_key ( |v| v. 0 ) ;
264275 }
265276
277+ tracing:: trace!(
278+ "Versions by collection ID: {:#?}" ,
279+ versions_by_collection_id
280+ ) ;
281+
266282 let mut graph = DiGraph :: new ( ) ;
267283 for ( collection_id, versions) in versions_by_collection_id. iter ( ) {
268284 let mut prev_node = None ;
@@ -273,12 +289,14 @@ impl ConstructVersionGraphOrchestrator {
273289 status : * status,
274290 } ) ;
275291 if let Some ( prev) = prev_node {
292+ // Add edge between each successive pair of collection versions
276293 graph. add_edge ( prev, node, ( ) ) ;
277294 }
278295 prev_node = Some ( node) ;
279296 }
280297 }
281298
299+ // Add edges for forked collections
282300 for dependency in self . version_dependencies . iter ( ) {
283301 let source_node = graph
284302 . node_indices ( )
@@ -287,15 +305,25 @@ impl ConstructVersionGraphOrchestrator {
287305 node. collection_id == dependency. source_collection_id
288306 && node. version == dependency. source_collection_version
289307 } )
290- . ok_or ( ConstructVersionGraphError :: ExpectedNodeNotFound ) ?;
308+ . ok_or_else ( || {
309+ ConstructVersionGraphError :: ExpectedNodeNotFound (
310+ dependency. source_collection_id ,
311+ Some ( dependency. source_collection_version ) ,
312+ )
313+ } ) ?;
291314
292315 let target_node = graph
293316 . node_indices ( )
294317 . find ( |n| {
295318 let node = graph. node_weight ( * n) . expect ( "node index should exist" ) ;
296319 node. collection_id == dependency. target_collection_id
297320 } )
298- . ok_or ( ConstructVersionGraphError :: ExpectedNodeNotFound ) ?;
321+ . ok_or_else ( || {
322+ ConstructVersionGraphError :: ExpectedNodeNotFound (
323+ dependency. target_collection_id ,
324+ None ,
325+ )
326+ } ) ?;
299327
300328 graph. add_edge ( source_node, target_node, ( ) ) ;
301329 }
@@ -308,6 +336,15 @@ impl ConstructVersionGraphOrchestrator {
308336
309337 tracing:: trace!( "Version files: {:#?}" , self . version_files) ;
310338
339+ let components = petgraph:: algo:: connected_components ( & graph) ;
340+ if components != 1 {
341+ // This is a defensive check, it should never happen
342+ return Err ( ConstructVersionGraphError :: InvariantViolation ( format ! (
343+ "Graph is not fully connected, found {} components" ,
344+ components
345+ ) ) ) ;
346+ }
347+
311348 self . terminate_with_result (
312349 Ok ( ConstructVersionGraphResponse {
313350 graph,
0 commit comments