@@ -32,7 +32,7 @@ use chroma_system::{
3232 PanicError , System , TaskError , TaskResult ,
3333} ;
3434use chroma_types:: chroma_proto:: { CollectionVersionFile , VersionListForCollection } ;
35- use chroma_types:: CollectionUuid ;
35+ use chroma_types:: { CollectionUuid , DeleteCollectionError } ;
3636use chrono:: { DateTime , Utc } ;
3737use std:: collections:: { HashMap , HashSet } ;
3838use std:: str:: FromStr ;
@@ -61,6 +61,9 @@ pub struct GarbageCollectorOrchestrator {
6161 num_pending_tasks : usize ,
6262 min_versions_to_keep : u32 ,
6363 graph : Option < VersionGraph > ,
64+ soft_deleted_collections_to_gc : HashSet < CollectionUuid > ,
65+ tenant : Option < String > ,
66+ database_name : Option < String > ,
6467
6568 num_files_deleted : u32 ,
6669 num_versions_deleted : u32 ,
@@ -107,6 +110,9 @@ impl GarbageCollectorOrchestrator {
107110 num_pending_tasks : 0 ,
108111 min_versions_to_keep,
109112 graph : None ,
113+ soft_deleted_collections_to_gc : HashSet :: new ( ) ,
114+ tenant : None ,
115+ database_name : None ,
110116
111117 num_files_deleted : 0 ,
112118 num_versions_deleted : 0 ,
@@ -146,6 +152,8 @@ pub enum GarbageCollectorError {
146152 InvariantViolation ( String ) ,
147153 #[ error( "Could not parse UUID: {0}" ) ]
148154 UnparsableUuid ( #[ from] uuid:: Error ) ,
155+ #[ error( "Collection deletion failed: {0}" ) ]
156+ CollectionDeletionFailed ( #[ from] DeleteCollectionError ) ,
149157}
150158
151159impl ChromaError for GarbageCollectorError {
@@ -216,13 +224,35 @@ impl GarbageCollectorOrchestrator {
216224 self . lineage_file_path . clone ( ) ,
217225 ) ;
218226 let output = orchestrator. run ( self . system . clone ( ) ) . await ?;
227+
228+ let collection_ids = output. version_files . keys ( ) . cloned ( ) . collect :: < Vec < _ > > ( ) ;
229+ let soft_delete_statuses = self
230+ . sysdb_client
231+ . batch_get_collection_soft_delete_status ( collection_ids)
232+ . await
233+ . unwrap ( ) ; // todo
234+ // todo: filter out root collection?
235+ self . soft_deleted_collections_to_gc = soft_delete_statuses
236+ . iter ( )
237+ . filter_map (
238+ |( collection_id, status) | {
239+ if * status {
240+ Some ( * collection_id)
241+ } else {
242+ None
243+ }
244+ } ,
245+ )
246+ . collect ( ) ;
247+
219248 self . version_files = output. version_files ;
220249 self . graph = Some ( output. graph . clone ( ) ) ;
221250
222251 let task = wrap (
223252 Box :: new ( ComputeVersionsToDeleteOperator { } ) ,
224253 ComputeVersionsToDeleteInput {
225254 graph : output. graph ,
255+ soft_deleted_collections : self . soft_deleted_collections_to_gc . clone ( ) ,
226256 cutoff_time : self . absolute_cutoff_time ,
227257 min_versions_to_keep : self . min_versions_to_keep ,
228258 } ,
@@ -262,6 +292,13 @@ impl GarbageCollectorOrchestrator {
262292 } )
263293 . collect ( ) ;
264294
295+ self . pending_mark_versions_at_sysdb_tasks = output
296+ . versions
297+ . keys ( )
298+ . filter ( |collection_id| !self . soft_deleted_collections_to_gc . contains ( collection_id) )
299+ . cloned ( )
300+ . collect ( ) ;
301+
265302 for ( collection_id, versions) in & output. versions {
266303 let version_file = self
267304 . version_files
@@ -398,7 +435,7 @@ impl GarbageCollectorOrchestrator {
398435 output. collection_id, output. version
399436 ) ) ) ?;
400437
401- let root = graph
438+ let root_index = graph
402439 . node_indices ( )
403440 . find ( |& n| {
404441 graph
@@ -409,20 +446,26 @@ impl GarbageCollectorOrchestrator {
409446 . ok_or ( GarbageCollectorError :: InvariantViolation (
410447 "Expected to find root node" . to_string ( ) ,
411448 ) ) ?;
412-
413- let versions_from_root_to_this_node =
414- petgraph:: algo:: astar ( graph, root, |finish| finish == this_node, |_| 1 , |_| 0 )
415- . ok_or ( GarbageCollectorError :: InvariantViolation ( format ! (
416- "Expected to find path from root to node for {}@v{}" ,
417- output. collection_id, output. version
418- ) ) ) ?
419- . 1
420- . into_iter ( )
421- . map ( |i| {
422- let node = graph. node_weight ( i) . expect ( "Node should exist" ) ;
423- node. version
424- } )
425- . collect :: < Vec < _ > > ( ) ;
449+ let root = graph. node_weight ( root_index) . expect ( "Node should exist" ) ;
450+
451+ let versions_from_root_to_this_node = petgraph:: algo:: astar (
452+ graph,
453+ root_index,
454+ |finish| finish == this_node,
455+ |_| 1 ,
456+ |_| 0 ,
457+ )
458+ . ok_or ( GarbageCollectorError :: InvariantViolation ( format ! (
459+ "Expected to find path from root ({}@v{}) to node for {}@v{}" ,
460+ root. collection_id, root. version, output. collection_id, output. version
461+ ) ) ) ?
462+ . 1
463+ . into_iter ( )
464+ . map ( |i| {
465+ let node = graph. node_weight ( i) . expect ( "Node should exist" ) ;
466+ node. version
467+ } )
468+ . collect :: < Vec < _ > > ( ) ;
426469 let are_all_versions_v0 = versions_from_root_to_this_node
427470 . iter ( )
428471 . all ( |& version| version == 0 ) ;
@@ -523,14 +566,15 @@ impl GarbageCollectorOrchestrator {
523566 "Expected there to be at least one version file" . to_string ( ) ,
524567 ) ) ?;
525568 // Assumes that all collections in a fork tree are under the same tenant
526- let tenant_id = version_file
527- . collection_info_immutable
528- . as_ref ( )
529- . ok_or ( GarbageCollectorError :: InvariantViolation (
569+ let collection_info = version_file. collection_info_immutable . as_ref ( ) . ok_or (
570+ GarbageCollectorError :: InvariantViolation (
530571 "Expected collection_info_immutable to be set" . to_string ( ) ,
531- ) ) ?
532- . tenant_id
533- . clone ( ) ;
572+ ) ,
573+ ) ?;
574+ let tenant_id = collection_info. tenant_id . clone ( ) ;
575+ self . tenant = Some ( tenant_id. clone ( ) ) ;
576+ let database_name = collection_info. database_name . clone ( ) ;
577+ self . database_name = Some ( database_name. clone ( ) ) ;
534578
535579 let task = wrap (
536580 Box :: new ( DeleteUnusedFilesOperator :: new (
@@ -659,6 +703,91 @@ impl GarbageCollectorOrchestrator {
659703
660704 Ok ( ( ) )
661705 }
706+
707+ async fn handle_delete_versions_output (
708+ & mut self ,
709+ output : DeleteVersionsAtSysDbOutput ,
710+ ctx : & ComponentContext < Self > ,
711+ ) -> Result < ( ) , GarbageCollectorError > {
712+ tracing:: trace!( "Received DeleteVersionsAtSysDbOutput: {:#?}" , output) ;
713+ self . num_versions_deleted += output. versions_to_delete . versions . len ( ) as u32 ;
714+
715+ self . num_pending_tasks -= 1 ;
716+ if self . num_pending_tasks == 0 {
717+ for collection_id in self . soft_deleted_collections_to_gc . drain ( ) {
718+ let graph =
719+ self . graph
720+ . as_ref ( )
721+ . ok_or ( GarbageCollectorError :: InvariantViolation (
722+ "Expected graph to be set" . to_string ( ) ,
723+ ) ) ?;
724+
725+ let first_collection_node = graph
726+ . node_indices ( )
727+ . filter ( |& n| {
728+ let node = graph. node_weight ( n) . expect ( "Node should exist" ) ;
729+ node. collection_id == collection_id
730+ } )
731+ . max_by ( |a, b| {
732+ // todo: does this sort in the right direction?
733+ let a_node = graph. node_weight ( * a) . expect ( "Node should exist" ) ;
734+ let b_node = graph. node_weight ( * b) . expect ( "Node should exist" ) ;
735+ b_node. version . cmp ( & a_node. version )
736+ } )
737+ . ok_or ( GarbageCollectorError :: InvariantViolation ( format ! (
738+ "Expected to find node for collection {}" ,
739+ collection_id
740+ ) ) ) ?;
741+
742+ let first_node = graph
743+ . node_weight ( first_collection_node)
744+ . expect ( "Node should exist" ) ;
745+ tracing:: debug!(
746+ "First node for collection {}: {:#?}" ,
747+ collection_id,
748+ first_node
749+ ) ;
750+
751+ let mut dfs = petgraph:: visit:: Dfs :: new ( graph, first_collection_node) ;
752+ let mut seen_collection_ids: HashSet < CollectionUuid > = HashSet :: new ( ) ;
753+
754+ while let Some ( nx) = dfs. next ( & graph) {
755+ let node = graph. node_weight ( nx) . expect ( "Node should exist" ) ;
756+ seen_collection_ids. insert ( node. collection_id ) ;
757+ }
758+
759+ let is_leaf_node_in_fork_tree =
760+ seen_collection_ids. len ( ) == 1 && seen_collection_ids. contains ( & collection_id) ;
761+
762+ if is_leaf_node_in_fork_tree {
763+ self . sysdb_client
764+ . finish_collection_deletion (
765+ self . tenant . clone ( ) . ok_or (
766+ GarbageCollectorError :: InvariantViolation (
767+ "Expected tenant to be set" . to_string ( ) ,
768+ ) ,
769+ ) ?,
770+ self . database_name . clone ( ) . ok_or (
771+ GarbageCollectorError :: InvariantViolation (
772+ "Expected database to be set" . to_string ( ) ,
773+ ) ,
774+ ) ?,
775+ collection_id,
776+ )
777+ . await ?;
778+ }
779+ }
780+
781+ let response = GarbageCollectorResponse {
782+ num_files_deleted : self . num_files_deleted ,
783+ num_versions_deleted : self . num_versions_deleted ,
784+ } ;
785+
786+ self . terminate_with_result ( Ok ( response) , ctx) . await ;
787+ }
788+
789+ Ok ( ( ) )
790+ }
662791}
663792
664793#[ async_trait]
@@ -774,18 +903,9 @@ impl Handler<TaskResult<DeleteVersionsAtSysDbOutput, DeleteVersionsAtSysDbError>
774903 Some ( output) => output,
775904 None => return ,
776905 } ;
777- tracing:: trace!( "Received DeleteVersionsAtSysDbOutput: {:#?}" , output) ;
778- self . num_versions_deleted += output. versions_to_delete . versions . len ( ) as u32 ;
779-
780- self . num_pending_tasks -= 1 ;
781- if self . num_pending_tasks == 0 {
782- let response = GarbageCollectorResponse {
783- num_files_deleted : self . num_files_deleted ,
784- num_versions_deleted : self . num_versions_deleted ,
785- } ;
786906
787- self . terminate_with_result ( Ok ( response ) , ctx) . await ;
788- }
907+ let res = self . handle_delete_versions_output ( output , ctx) . await ;
908+ self . ok_or_terminate ( res , ctx ) . await ;
789909 }
790910}
791911
0 commit comments