1- use crate :: operators:: {
2- fetch_lineage_file:: {
3- FetchLineageFileError , FetchLineageFileInput , FetchLineageFileOperator ,
4- FetchLineageFileOutput ,
5- } ,
6- fetch_version_file:: {
7- FetchVersionFileError , FetchVersionFileInput , FetchVersionFileOperator ,
8- FetchVersionFileOutput ,
9- } ,
10- get_version_file_paths:: {
11- GetVersionFilePathsError , GetVersionFilePathsInput , GetVersionFilePathsOperator ,
12- GetVersionFilePathsOutput ,
1+ use crate :: {
2+ operators:: {
3+ fetch_lineage_file:: {
4+ FetchLineageFileError , FetchLineageFileInput , FetchLineageFileOperator ,
5+ FetchLineageFileOutput ,
6+ } ,
7+ fetch_version_file:: {
8+ FetchVersionFileError , FetchVersionFileInput , FetchVersionFileOperator ,
9+ FetchVersionFileOutput ,
10+ } ,
11+ get_version_file_paths:: {
12+ GetVersionFilePathsError , GetVersionFilePathsInput , GetVersionFilePathsOperator ,
13+ GetVersionFilePathsOutput ,
14+ } ,
1315 } ,
16+ types:: { VersionGraph , VersionGraphNode , VersionStatus } ,
1417} ;
1518use async_trait:: async_trait;
1619use base64:: { prelude:: BASE64_STANDARD , Engine } ;
@@ -32,15 +35,6 @@ use thiserror::Error;
3235use tokio:: sync:: oneshot:: { error:: RecvError , Sender } ;
3336use tracing:: Span ;
3437
35- #[ derive( Debug , Clone , Copy ) ]
36- pub enum VersionStatus {
37- #[ allow( dead_code) ]
38- Alive {
39- created_at : DateTime < chrono:: Utc > ,
40- } ,
41- Deleted ,
42- }
43-
4438#[ derive( Debug , Clone ) ]
4539struct VersionDependency {
4640 source_collection_id : CollectionUuid ,
@@ -91,16 +85,6 @@ impl ConstructVersionGraphOrchestrator {
9185 }
9286}
9387
94- #[ derive( Debug , Clone ) ]
95- pub struct VersionGraphNode {
96- pub collection_id : CollectionUuid ,
97- pub version : i64 ,
98- #[ allow( dead_code) ]
99- pub status : VersionStatus ,
100- }
101-
102- pub type VersionGraph = DiGraph < VersionGraphNode , ( ) > ;
103-
10488#[ derive( Debug ) ]
10589#[ allow( dead_code) ]
10690pub struct ConstructVersionGraphResponse {
@@ -130,8 +114,10 @@ pub enum ConstructVersionGraphError {
130114 InvalidUuid ( #[ from] uuid:: Error ) ,
131115 #[ error( "Invalid timestamp: {0}" ) ]
132116 InvalidTimestamp ( i64 ) ,
133- #[ error( "Expected node not found while constructing graph" ) ]
134- ExpectedNodeNotFound ,
117+ #[ error( "Expected node not found while constructing graph (collection {0}@v{1:?})" ) ]
118+ ExpectedNodeNotFound ( CollectionUuid , Option < i64 > ) ,
119+ #[ error( "Invariant violation: {0}" ) ]
120+ InvariantViolation ( String ) ,
135121}
136122
137123impl < E > From < TaskError < E > > for ConstructVersionGraphError
@@ -159,7 +145,8 @@ impl ChromaError for ConstructVersionGraphError {
159145 ConstructVersionGraphError :: FetchVersionFilePaths ( err) => err. code ( ) ,
160146 ConstructVersionGraphError :: InvalidUuid ( _) => ErrorCodes :: Internal ,
161147 ConstructVersionGraphError :: InvalidTimestamp ( _) => ErrorCodes :: InvalidArgument ,
162- ConstructVersionGraphError :: ExpectedNodeNotFound => ErrorCodes :: Internal ,
148+ ConstructVersionGraphError :: ExpectedNodeNotFound ( _, _) => ErrorCodes :: Internal ,
149+ ConstructVersionGraphError :: InvariantViolation ( _) => ErrorCodes :: Internal ,
163150 }
164151 }
165152}
@@ -224,9 +211,11 @@ impl ConstructVersionGraphOrchestrator {
224211 ctx : & ComponentContext < ConstructVersionGraphOrchestrator > ,
225212 ) -> Result < ( ) , ConstructVersionGraphError > {
226213 if self . num_pending_tasks == 0 {
214+ // This map will be used as a basis for building the graph
227215 let mut versions_by_collection_id: HashMap < CollectionUuid , Vec < ( i64 , VersionStatus ) > > =
228216 HashMap :: new ( ) ;
229217
218+ // Add all known versions from version files to map
230219 for ( collection_id, version_file) in self . version_files . iter ( ) {
231220 if let Some ( versions) = & version_file. version_history {
232221 for version in versions. versions . iter ( ) {
@@ -251,6 +240,7 @@ impl ConstructVersionGraphOrchestrator {
251240 }
252241 }
253242
243+ // If any version appears as a version dependency (from the lineage file) but does not already exist in the map from the version files, the version must have been deleted.
254244 for dependency in self . version_dependencies . iter ( ) {
255245 let source_collection_id = dependency. source_collection_id ;
256246 let source_collection_version = dependency. source_collection_version ;
@@ -272,6 +262,11 @@ impl ConstructVersionGraphOrchestrator {
272262 versions. sort_unstable_by_key ( |v| v. 0 ) ;
273263 }
274264
265+ tracing:: trace!(
266+ "Versions by collection ID: {:#?}" ,
267+ versions_by_collection_id
268+ ) ;
269+
275270 let mut graph = DiGraph :: new ( ) ;
276271 for ( collection_id, versions) in versions_by_collection_id. iter ( ) {
277272 let mut prev_node = None ;
@@ -282,12 +277,14 @@ impl ConstructVersionGraphOrchestrator {
282277 status : * status,
283278 } ) ;
284279 if let Some ( prev) = prev_node {
280+ // Add edge between each successive pair of collection versions
285281 graph. add_edge ( prev, node, ( ) ) ;
286282 }
287283 prev_node = Some ( node) ;
288284 }
289285 }
290286
287+ // Add edges for forked collections
291288 for dependency in self . version_dependencies . iter ( ) {
292289 let source_node = graph
293290 . node_indices ( )
@@ -296,15 +293,25 @@ impl ConstructVersionGraphOrchestrator {
296293 node. collection_id == dependency. source_collection_id
297294 && node. version == dependency. source_collection_version
298295 } )
299- . ok_or ( ConstructVersionGraphError :: ExpectedNodeNotFound ) ?;
296+ . ok_or_else ( || {
297+ ConstructVersionGraphError :: ExpectedNodeNotFound (
298+ dependency. source_collection_id ,
299+ Some ( dependency. source_collection_version ) ,
300+ )
301+ } ) ?;
300302
301303 let target_node = graph
302304 . node_indices ( )
303305 . find ( |n| {
304306 let node = graph. node_weight ( * n) . expect ( "node index should exist" ) ;
305307 node. collection_id == dependency. target_collection_id
306308 } )
307- . ok_or ( ConstructVersionGraphError :: ExpectedNodeNotFound ) ?;
309+ . ok_or_else ( || {
310+ ConstructVersionGraphError :: ExpectedNodeNotFound (
311+ dependency. target_collection_id ,
312+ None ,
313+ )
314+ } ) ?;
308315
309316 graph. add_edge ( source_node, target_node, ( ) ) ;
310317 }
@@ -317,6 +324,15 @@ impl ConstructVersionGraphOrchestrator {
317324
318325 tracing:: trace!( "Version files: {:#?}" , self . version_files) ;
319326
327+ let components = petgraph:: algo:: connected_components ( & graph) ;
328+ if components != 1 {
329+ // This is a defensive check, it should never happen
330+ return Err ( ConstructVersionGraphError :: InvariantViolation ( format ! (
331+ "Graph is not fully connected, found {} components" ,
332+ components
333+ ) ) ) ;
334+ }
335+
320336 self . terminate_with_result (
321337 Ok ( ConstructVersionGraphResponse {
322338 graph,
0 commit comments