@@ -32,7 +32,9 @@ use chroma_system::{
3232 PanicError , System , TaskError , TaskResult ,
3333} ;
3434use chroma_types:: chroma_proto:: { CollectionVersionFile , VersionListForCollection } ;
35- use chroma_types:: CollectionUuid ;
35+ use chroma_types:: {
36+ BatchGetCollectionSoftDeleteStatusError , CollectionUuid , DeleteCollectionError ,
37+ } ;
3638use chrono:: { DateTime , Utc } ;
3739use std:: collections:: { HashMap , HashSet } ;
3840use std:: str:: FromStr ;
@@ -61,6 +63,9 @@ pub struct GarbageCollectorOrchestrator {
6163 num_pending_tasks : usize ,
6264 min_versions_to_keep : u32 ,
6365 graph : Option < VersionGraph > ,
66+ soft_deleted_collections_to_gc : HashSet < CollectionUuid > ,
67+ tenant : Option < String > ,
68+ database_name : Option < String > ,
6469
6570 num_files_deleted : u32 ,
6671 num_versions_deleted : u32 ,
@@ -107,6 +112,9 @@ impl GarbageCollectorOrchestrator {
107112 num_pending_tasks : 0 ,
108113 min_versions_to_keep,
109114 graph : None ,
115+ soft_deleted_collections_to_gc : HashSet :: new ( ) ,
116+ tenant : None ,
117+ database_name : None ,
110118
111119 num_files_deleted : 0 ,
112120 num_versions_deleted : 0 ,
@@ -127,6 +135,8 @@ pub enum GarbageCollectorError {
127135 #[ error( "The task was aborted because resources were exhausted" ) ]
128136 Aborted ,
129137
138+ #[ error( "Failed to get collection soft delete status: {0}" ) ]
139+ BatchGetCollectionSoftDeleteStatus ( #[ from] BatchGetCollectionSoftDeleteStatusError ) ,
130140 #[ error( "Failed to construct version graph: {0}" ) ]
131141 ConstructVersionGraph ( #[ from] ConstructVersionGraphError ) ,
132142 #[ error( "Failed to compute versions to delete: {0}" ) ]
@@ -146,6 +156,8 @@ pub enum GarbageCollectorError {
146156 InvariantViolation ( String ) ,
147157 #[ error( "Could not parse UUID: {0}" ) ]
148158 UnparsableUuid ( #[ from] uuid:: Error ) ,
159+ #[ error( "Collection deletion failed: {0}" ) ]
160+ CollectionDeletionFailed ( #[ from] DeleteCollectionError ) ,
149161}
150162
151163impl ChromaError for GarbageCollectorError {
@@ -216,13 +228,33 @@ impl GarbageCollectorOrchestrator {
216228 self . lineage_file_path . clone ( ) ,
217229 ) ;
218230 let output = orchestrator. run ( self . system . clone ( ) ) . await ?;
231+
232+ let collection_ids = output. version_files . keys ( ) . cloned ( ) . collect :: < Vec < _ > > ( ) ;
233+ let soft_delete_statuses = self
234+ . sysdb_client
235+ . batch_get_collection_soft_delete_status ( collection_ids)
236+ . await ?;
237+ self . soft_deleted_collections_to_gc = soft_delete_statuses
238+ . iter ( )
239+ . filter_map (
240+ |( collection_id, status) | {
241+ if * status {
242+ Some ( * collection_id)
243+ } else {
244+ None
245+ }
246+ } ,
247+ )
248+ . collect ( ) ;
249+
219250 self . version_files = output. version_files ;
220251 self . graph = Some ( output. graph . clone ( ) ) ;
221252
222253 let task = wrap (
223254 Box :: new ( ComputeVersionsToDeleteOperator { } ) ,
224255 ComputeVersionsToDeleteInput {
225256 graph : output. graph ,
257+ soft_deleted_collections : self . soft_deleted_collections_to_gc . clone ( ) ,
226258 cutoff_time : self . absolute_cutoff_time ,
227259 min_versions_to_keep : self . min_versions_to_keep ,
228260 } ,
@@ -262,6 +294,13 @@ impl GarbageCollectorOrchestrator {
262294 } )
263295 . collect ( ) ;
264296
297+ self . pending_mark_versions_at_sysdb_tasks = output
298+ . versions
299+ . keys ( )
300+ . filter ( |collection_id| !self . soft_deleted_collections_to_gc . contains ( collection_id) )
301+ . cloned ( )
302+ . collect ( ) ;
303+
265304 for ( collection_id, versions) in & output. versions {
266305 let version_file = self
267306 . version_files
@@ -398,7 +437,7 @@ impl GarbageCollectorOrchestrator {
398437 output. collection_id, output. version
399438 ) ) ) ?;
400439
401- let root = graph
440+ let root_index = graph
402441 . node_indices ( )
403442 . find ( |& n| {
404443 graph
@@ -409,20 +448,26 @@ impl GarbageCollectorOrchestrator {
409448 . ok_or ( GarbageCollectorError :: InvariantViolation (
410449 "Expected to find root node" . to_string ( ) ,
411450 ) ) ?;
412-
413- let versions_from_root_to_this_node =
414- petgraph:: algo:: astar ( graph, root, |finish| finish == this_node, |_| 1 , |_| 0 )
415- . ok_or ( GarbageCollectorError :: InvariantViolation ( format ! (
416- "Expected to find path from root to node for {}@v{}" ,
417- output. collection_id, output. version
418- ) ) ) ?
419- . 1
420- . into_iter ( )
421- . map ( |i| {
422- let node = graph. node_weight ( i) . expect ( "Node should exist" ) ;
423- node. version
424- } )
425- . collect :: < Vec < _ > > ( ) ;
451+ let root = graph. node_weight ( root_index) . expect ( "Node should exist" ) ;
452+
453+ let versions_from_root_to_this_node = petgraph:: algo:: astar (
454+ graph,
455+ root_index,
456+ |finish| finish == this_node,
457+ |_| 1 ,
458+ |_| 0 ,
459+ )
460+ . ok_or ( GarbageCollectorError :: InvariantViolation ( format ! (
461+ "Expected to find path from root ({}@v{}) to node for {}@v{}" ,
462+ root. collection_id, root. version, output. collection_id, output. version
463+ ) ) ) ?
464+ . 1
465+ . into_iter ( )
466+ . map ( |i| {
467+ let node = graph. node_weight ( i) . expect ( "Node should exist" ) ;
468+ node. version
469+ } )
470+ . collect :: < Vec < _ > > ( ) ;
426471 let are_all_versions_v0 = versions_from_root_to_this_node
427472 . iter ( )
428473 . all ( |& version| version == 0 ) ;
@@ -523,14 +568,15 @@ impl GarbageCollectorOrchestrator {
523568 "Expected there to be at least one version file" . to_string ( ) ,
524569 ) ) ?;
525570 // Assumes that all collections in a fork tree are under the same tenant
526- let tenant_id = version_file
527- . collection_info_immutable
528- . as_ref ( )
529- . ok_or ( GarbageCollectorError :: InvariantViolation (
571+ let collection_info = version_file. collection_info_immutable . as_ref ( ) . ok_or (
572+ GarbageCollectorError :: InvariantViolation (
530573 "Expected collection_info_immutable to be set" . to_string ( ) ,
531- ) ) ?
532- . tenant_id
533- . clone ( ) ;
574+ ) ,
575+ ) ?;
576+ let tenant_id = collection_info. tenant_id . clone ( ) ;
577+ self . tenant = Some ( tenant_id. clone ( ) ) ;
578+ let database_name = collection_info. database_name . clone ( ) ;
579+ self . database_name = Some ( database_name. clone ( ) ) ;
534580
535581 let task = wrap (
536582 Box :: new ( DeleteUnusedFilesOperator :: new (
@@ -659,6 +705,85 @@ impl GarbageCollectorOrchestrator {
659705
660706 Ok ( ( ) )
661707 }
708+
709+ async fn handle_delete_versions_output (
710+ & mut self ,
711+ output : DeleteVersionsAtSysDbOutput ,
712+ ctx : & ComponentContext < Self > ,
713+ ) -> Result < ( ) , GarbageCollectorError > {
714+ tracing:: trace!( "Received DeleteVersionsAtSysDbOutput: {:#?}" , output) ;
715+ self . num_versions_deleted += output. versions_to_delete . versions . len ( ) as u32 ;
716+
717+ self . num_pending_tasks -= 1 ;
718+ if self . num_pending_tasks == 0 {
719+ for collection_id in self . soft_deleted_collections_to_gc . iter ( ) {
720+ let graph =
721+ self . graph
722+ . as_ref ( )
723+ . ok_or ( GarbageCollectorError :: InvariantViolation (
724+ "Expected graph to be set" . to_string ( ) ,
725+ ) ) ?;
726+
727+ // Find node with minimum version for the collection
728+ let first_collection_node = graph
729+ . node_indices ( )
730+ . filter ( |& n| {
731+ let node = graph. node_weight ( n) . expect ( "Node should exist" ) ;
732+ node. collection_id == * collection_id
733+ } )
734+ . min_by ( |a, b| {
735+ let a_node = graph. node_weight ( * a) . expect ( "Node should exist" ) ;
736+ let b_node = graph. node_weight ( * b) . expect ( "Node should exist" ) ;
737+ a_node. version . cmp ( & b_node. version )
738+ } )
739+ . ok_or ( GarbageCollectorError :: InvariantViolation ( format ! (
740+ "Expected to find node for collection {}" ,
741+ collection_id
742+ ) ) ) ?;
743+
744+ // We cannot finalize collection deletion (perform a hard delete) if there are any forked collections downstream that are still alive. If we violated this invariant, there would be a missing edge in the lineage file (resulting in an unconnected graph).
745+ let mut dfs = petgraph:: visit:: Dfs :: new ( graph, first_collection_node) ;
746+ let mut seen_collection_ids: HashSet < CollectionUuid > = HashSet :: new ( ) ;
747+
748+ while let Some ( nx) = dfs. next ( & graph) {
749+ let node = graph. node_weight ( nx) . expect ( "Node should exist" ) ;
750+ seen_collection_ids. insert ( node. collection_id ) ;
751+ }
752+
753+ let are_all_children_in_fork_tree_also_soft_deleted =
754+ seen_collection_ids. iter ( ) . all ( |collection_id| {
755+ self . soft_deleted_collections_to_gc . contains ( collection_id)
756+ } ) ;
757+
758+ if are_all_children_in_fork_tree_also_soft_deleted {
759+ self . sysdb_client
760+ . finish_collection_deletion (
761+ self . tenant . clone ( ) . ok_or (
762+ GarbageCollectorError :: InvariantViolation (
763+ "Expected tenant to be set" . to_string ( ) ,
764+ ) ,
765+ ) ?,
766+ self . database_name . clone ( ) . ok_or (
767+ GarbageCollectorError :: InvariantViolation (
768+ "Expected database to be set" . to_string ( ) ,
769+ ) ,
770+ ) ?,
771+ * collection_id,
772+ )
773+ . await ?;
774+ }
775+ }
776+
777+ let response = GarbageCollectorResponse {
778+ num_files_deleted : self . num_files_deleted ,
779+ num_versions_deleted : self . num_versions_deleted ,
780+ } ;
781+
782+ self . terminate_with_result ( Ok ( response) , ctx) . await ;
783+ }
784+
785+ Ok ( ( ) )
786+ }
662787}
663788
664789#[ async_trait]
@@ -774,18 +899,9 @@ impl Handler<TaskResult<DeleteVersionsAtSysDbOutput, DeleteVersionsAtSysDbError>
774899 Some ( output) => output,
775900 None => return ,
776901 } ;
777- tracing:: trace!( "Received DeleteVersionsAtSysDbOutput: {:#?}" , output) ;
778- self . num_versions_deleted += output. versions_to_delete . versions . len ( ) as u32 ;
779902
780- self . num_pending_tasks -= 1 ;
781- if self . num_pending_tasks == 0 {
782- let response = GarbageCollectorResponse {
783- num_files_deleted : self . num_files_deleted ,
784- num_versions_deleted : self . num_versions_deleted ,
785- } ;
786-
787- self . terminate_with_result ( Ok ( response) , ctx) . await ;
788- }
903+ let res = self . handle_delete_versions_output ( output, ctx) . await ;
904+ self . ok_or_terminate ( res, ctx) . await ;
789905 }
790906}
791907
0 commit comments