@@ -32,7 +32,9 @@ use chroma_system::{
3232 PanicError , System , TaskError , TaskResult ,
3333} ;
3434use chroma_types:: chroma_proto:: { CollectionVersionFile , VersionListForCollection } ;
35- use chroma_types:: CollectionUuid ;
35+ use chroma_types:: {
36+ BatchGetCollectionSoftDeleteStatusError , CollectionUuid , DeleteCollectionError ,
37+ } ;
3638use chrono:: { DateTime , Utc } ;
3739use std:: collections:: { HashMap , HashSet } ;
3840use std:: str:: FromStr ;
@@ -61,6 +63,9 @@ pub struct GarbageCollectorOrchestrator {
6163 num_pending_tasks : usize ,
6264 min_versions_to_keep : u32 ,
6365 graph : Option < VersionGraph > ,
66+ soft_deleted_collections_to_gc : HashSet < CollectionUuid > ,
67+ tenant : Option < String > ,
68+ database_name : Option < String > ,
6469
6570 num_files_deleted : u32 ,
6671 num_versions_deleted : u32 ,
@@ -107,6 +112,9 @@ impl GarbageCollectorOrchestrator {
107112 num_pending_tasks : 0 ,
108113 min_versions_to_keep,
109114 graph : None ,
115+ soft_deleted_collections_to_gc : HashSet :: new ( ) ,
116+ tenant : None ,
117+ database_name : None ,
110118
111119 num_files_deleted : 0 ,
112120 num_versions_deleted : 0 ,
@@ -127,6 +135,8 @@ pub enum GarbageCollectorError {
127135 #[ error( "The task was aborted because resources were exhausted" ) ]
128136 Aborted ,
129137
138+ #[ error( "Failed to get collection soft delete status: {0}" ) ]
139+ BatchGetCollectionSoftDeleteStatus ( #[ from] BatchGetCollectionSoftDeleteStatusError ) ,
130140 #[ error( "Failed to construct version graph: {0}" ) ]
131141 ConstructVersionGraph ( #[ from] ConstructVersionGraphError ) ,
132142 #[ error( "Failed to compute versions to delete: {0}" ) ]
@@ -146,6 +156,8 @@ pub enum GarbageCollectorError {
146156 InvariantViolation ( String ) ,
147157 #[ error( "Could not parse UUID: {0}" ) ]
148158 UnparsableUuid ( #[ from] uuid:: Error ) ,
159+ #[ error( "Collection deletion failed: {0}" ) ]
160+ CollectionDeletionFailed ( #[ from] DeleteCollectionError ) ,
149161}
150162
151163impl ChromaError for GarbageCollectorError {
@@ -216,13 +228,33 @@ impl GarbageCollectorOrchestrator {
216228 self . lineage_file_path . clone ( ) ,
217229 ) ;
218230 let output = orchestrator. run ( self . system . clone ( ) ) . await ?;
231+
232+ let collection_ids = output. version_files . keys ( ) . cloned ( ) . collect :: < Vec < _ > > ( ) ;
233+ let soft_delete_statuses = self
234+ . sysdb_client
235+ . batch_get_collection_soft_delete_status ( collection_ids)
236+ . await ?;
237+ self . soft_deleted_collections_to_gc = soft_delete_statuses
238+ . iter ( )
239+ . filter_map (
240+ |( collection_id, status) | {
241+ if * status {
242+ Some ( * collection_id)
243+ } else {
244+ None
245+ }
246+ } ,
247+ )
248+ . collect ( ) ;
249+
219250 self . version_files = output. version_files ;
220251 self . graph = Some ( output. graph . clone ( ) ) ;
221252
222253 let task = wrap (
223254 Box :: new ( ComputeVersionsToDeleteOperator { } ) ,
224255 ComputeVersionsToDeleteInput {
225256 graph : output. graph ,
257+ soft_deleted_collections : self . soft_deleted_collections_to_gc . clone ( ) ,
226258 cutoff_time : self . absolute_cutoff_time ,
227259 min_versions_to_keep : self . min_versions_to_keep ,
228260 } ,
@@ -262,6 +294,13 @@ impl GarbageCollectorOrchestrator {
262294 } )
263295 . collect ( ) ;
264296
297+ self . pending_mark_versions_at_sysdb_tasks = output
298+ . versions
299+ . keys ( )
300+ . filter ( |collection_id| !self . soft_deleted_collections_to_gc . contains ( collection_id) )
301+ . cloned ( )
302+ . collect ( ) ;
303+
265304 for ( collection_id, versions) in & output. versions {
266305 let version_file = self
267306 . version_files
@@ -398,7 +437,7 @@ impl GarbageCollectorOrchestrator {
398437 output. collection_id, output. version
399438 ) ) ) ?;
400439
401- let root = graph
440+ let root_index = graph
402441 . node_indices ( )
403442 . find ( |& n| {
404443 graph
@@ -409,20 +448,26 @@ impl GarbageCollectorOrchestrator {
409448 . ok_or ( GarbageCollectorError :: InvariantViolation (
410449 "Expected to find root node" . to_string ( ) ,
411450 ) ) ?;
412-
413- let versions_from_root_to_this_node =
414- petgraph:: algo:: astar ( graph, root, |finish| finish == this_node, |_| 1 , |_| 0 )
415- . ok_or ( GarbageCollectorError :: InvariantViolation ( format ! (
416- "Expected to find path from root to node for {}@v{}" ,
417- output. collection_id, output. version
418- ) ) ) ?
419- . 1
420- . into_iter ( )
421- . map ( |i| {
422- let node = graph. node_weight ( i) . expect ( "Node should exist" ) ;
423- node. version
424- } )
425- . collect :: < Vec < _ > > ( ) ;
451+ let root = graph. node_weight ( root_index) . expect ( "Node should exist" ) ;
452+
453+ let versions_from_root_to_this_node = petgraph:: algo:: astar (
454+ graph,
455+ root_index,
456+ |finish| finish == this_node,
457+ |_| 1 ,
458+ |_| 0 ,
459+ )
460+ . ok_or ( GarbageCollectorError :: InvariantViolation ( format ! (
461+ "Expected to find path from root ({}@v{}) to node for {}@v{}" ,
462+ root. collection_id, root. version, output. collection_id, output. version
463+ ) ) ) ?
464+ . 1
465+ . into_iter ( )
466+ . map ( |i| {
467+ let node = graph. node_weight ( i) . expect ( "Node should exist" ) ;
468+ node. version
469+ } )
470+ . collect :: < Vec < _ > > ( ) ;
426471 let are_all_versions_v0 = versions_from_root_to_this_node
427472 . iter ( )
428473 . all ( |& version| version == 0 ) ;
@@ -523,14 +568,15 @@ impl GarbageCollectorOrchestrator {
523568 "Expected there to be at least one version file" . to_string ( ) ,
524569 ) ) ?;
525570 // Assumes that all collections in a fork tree are under the same tenant
526- let tenant_id = version_file
527- . collection_info_immutable
528- . as_ref ( )
529- . ok_or ( GarbageCollectorError :: InvariantViolation (
571+ let collection_info = version_file. collection_info_immutable . as_ref ( ) . ok_or (
572+ GarbageCollectorError :: InvariantViolation (
530573 "Expected collection_info_immutable to be set" . to_string ( ) ,
531- ) ) ?
532- . tenant_id
533- . clone ( ) ;
574+ ) ,
575+ ) ?;
576+ let tenant_id = collection_info. tenant_id . clone ( ) ;
577+ self . tenant = Some ( tenant_id. clone ( ) ) ;
578+ let database_name = collection_info. database_name . clone ( ) ;
579+ self . database_name = Some ( database_name. clone ( ) ) ;
534580
535581 let task = wrap (
536582 Box :: new ( DeleteUnusedFilesOperator :: new (
@@ -659,6 +705,92 @@ impl GarbageCollectorOrchestrator {
659705
660706 Ok ( ( ) )
661707 }
708+
709+ async fn handle_delete_versions_output (
710+ & mut self ,
711+ output : DeleteVersionsAtSysDbOutput ,
712+ ctx : & ComponentContext < Self > ,
713+ ) -> Result < ( ) , GarbageCollectorError > {
714+ tracing:: trace!( "Received DeleteVersionsAtSysDbOutput: {:#?}" , output) ;
715+ self . num_versions_deleted += output. versions_to_delete . versions . len ( ) as u32 ;
716+
717+ self . num_pending_tasks -= 1 ;
718+ if self . num_pending_tasks == 0 {
719+ for collection_id in self . soft_deleted_collections_to_gc . iter ( ) {
720+ let graph =
721+ self . graph
722+ . as_ref ( )
723+ . ok_or ( GarbageCollectorError :: InvariantViolation (
724+ "Expected graph to be set" . to_string ( ) ,
725+ ) ) ?;
726+
727+ let first_collection_node = graph
728+ . node_indices ( )
729+ . filter ( |& n| {
730+ let node = graph. node_weight ( n) . expect ( "Node should exist" ) ;
731+ node. collection_id == * collection_id
732+ } )
733+ . max_by ( |a, b| {
734+ let a_node = graph. node_weight ( * a) . expect ( "Node should exist" ) ;
735+ let b_node = graph. node_weight ( * b) . expect ( "Node should exist" ) ;
736+ b_node. version . cmp ( & a_node. version )
737+ } )
738+ . ok_or ( GarbageCollectorError :: InvariantViolation ( format ! (
739+ "Expected to find node for collection {}" ,
740+ collection_id
741+ ) ) ) ?;
742+
743+ let first_node = graph
744+ . node_weight ( first_collection_node)
745+ . expect ( "Node should exist" ) ;
746+ tracing:: debug!(
747+ "First node for collection {}: {:#?}" ,
748+ collection_id,
749+ first_node
750+ ) ;
751+
752+ let mut dfs = petgraph:: visit:: Dfs :: new ( graph, first_collection_node) ;
753+ let mut seen_collection_ids: HashSet < CollectionUuid > = HashSet :: new ( ) ;
754+
755+ while let Some ( nx) = dfs. next ( & graph) {
756+ let node = graph. node_weight ( nx) . expect ( "Node should exist" ) ;
757+ seen_collection_ids. insert ( node. collection_id ) ;
758+ }
759+
760+ let are_all_children_in_fork_tree_also_soft_deleted =
761+ seen_collection_ids. iter ( ) . all ( |collection_id| {
762+ self . soft_deleted_collections_to_gc . contains ( collection_id)
763+ } ) ;
764+
765+ if are_all_children_in_fork_tree_also_soft_deleted {
766+ self . sysdb_client
767+ . finish_collection_deletion (
768+ self . tenant . clone ( ) . ok_or (
769+ GarbageCollectorError :: InvariantViolation (
770+ "Expected tenant to be set" . to_string ( ) ,
771+ ) ,
772+ ) ?,
773+ self . database_name . clone ( ) . ok_or (
774+ GarbageCollectorError :: InvariantViolation (
775+ "Expected database to be set" . to_string ( ) ,
776+ ) ,
777+ ) ?,
778+ * collection_id,
779+ )
780+ . await ?;
781+ }
782+ }
783+
784+ let response = GarbageCollectorResponse {
785+ num_files_deleted : self . num_files_deleted ,
786+ num_versions_deleted : self . num_versions_deleted ,
787+ } ;
788+
789+ self . terminate_with_result ( Ok ( response) , ctx) . await ;
790+ }
791+
792+ Ok ( ( ) )
793+ }
662794}
663795
664796#[ async_trait]
@@ -774,18 +906,9 @@ impl Handler<TaskResult<DeleteVersionsAtSysDbOutput, DeleteVersionsAtSysDbError>
774906 Some ( output) => output,
775907 None => return ,
776908 } ;
777- tracing:: trace!( "Received DeleteVersionsAtSysDbOutput: {:#?}" , output) ;
778- self . num_versions_deleted += output. versions_to_delete . versions . len ( ) as u32 ;
779-
780- self . num_pending_tasks -= 1 ;
781- if self . num_pending_tasks == 0 {
782- let response = GarbageCollectorResponse {
783- num_files_deleted : self . num_files_deleted ,
784- num_versions_deleted : self . num_versions_deleted ,
785- } ;
786909
787- self . terminate_with_result ( Ok ( response ) , ctx) . await ;
788- }
910+ let res = self . handle_delete_versions_output ( output , ctx) . await ;
911+ self . ok_or_terminate ( res , ctx ) . await ;
789912 }
790913}
791914
0 commit comments