@@ -32,8 +32,9 @@ use chroma_system::{
3232 PanicError , System , TaskError , TaskResult ,
3333} ;
3434use chroma_types:: chroma_proto:: { CollectionVersionFile , VersionListForCollection } ;
35- use chroma_types:: CollectionUuid ;
35+ use chroma_types:: { CollectionUuid , DeleteCollectionError } ;
3636use chrono:: { DateTime , Utc } ;
37+ use petgraph:: visit:: NodeCount ;
3738use std:: collections:: { HashMap , HashSet } ;
3839use std:: str:: FromStr ;
3940use thiserror:: Error ;
@@ -61,6 +62,9 @@ pub struct GarbageCollectorOrchestrator {
6162 num_pending_tasks : usize ,
6263 min_versions_to_keep : u32 ,
6364 graph : Option < VersionGraph > ,
65+ soft_deleted_collections_to_gc : HashSet < CollectionUuid > ,
66+ tenant : Option < String > ,
67+ database_name : Option < String > ,
6468
6569 num_files_deleted : u32 ,
6670 num_versions_deleted : u32 ,
@@ -107,6 +111,9 @@ impl GarbageCollectorOrchestrator {
107111 num_pending_tasks : 0 ,
108112 min_versions_to_keep,
109113 graph : None ,
114+ soft_deleted_collections_to_gc : HashSet :: new ( ) ,
115+ tenant : None ,
116+ database_name : None ,
110117
111118 num_files_deleted : 0 ,
112119 num_versions_deleted : 0 ,
@@ -146,6 +153,8 @@ pub enum GarbageCollectorError {
146153 InvariantViolation ( String ) ,
147154 #[ error( "Could not parse UUID: {0}" ) ]
148155 UnparsableUuid ( #[ from] uuid:: Error ) ,
156+ #[ error( "Collection deletion failed: {0}" ) ]
157+ CollectionDeletionFailed ( #[ from] DeleteCollectionError ) ,
149158}
150159
151160impl ChromaError for GarbageCollectorError {
@@ -216,13 +225,35 @@ impl GarbageCollectorOrchestrator {
216225 self . lineage_file_path . clone ( ) ,
217226 ) ;
218227 let output = orchestrator. run ( self . system . clone ( ) ) . await ?;
228+
229+ let collection_ids = output. version_files . keys ( ) . cloned ( ) . collect :: < Vec < _ > > ( ) ;
230+ let soft_delete_statuses = self
231+ . sysdb_client
232+ . batch_get_collection_soft_delete_status ( collection_ids)
233+ . await
234+ . unwrap ( ) ; // todo
235+ // todo: filter out root collection?
236+ self . soft_deleted_collections_to_gc = soft_delete_statuses
237+ . iter ( )
238+ . filter_map (
239+ |( collection_id, status) | {
240+ if * status {
241+ Some ( * collection_id)
242+ } else {
243+ None
244+ }
245+ } ,
246+ )
247+ . collect ( ) ;
248+
219249 self . version_files = output. version_files ;
220250 self . graph = Some ( output. graph . clone ( ) ) ;
221251
222252 let task = wrap (
223253 Box :: new ( ComputeVersionsToDeleteOperator { } ) ,
224254 ComputeVersionsToDeleteInput {
225255 graph : output. graph ,
256+ soft_deleted_collections : self . soft_deleted_collections_to_gc . clone ( ) ,
226257 cutoff_time : self . absolute_cutoff_time ,
227258 min_versions_to_keep : self . min_versions_to_keep ,
228259 } ,
@@ -262,6 +293,13 @@ impl GarbageCollectorOrchestrator {
262293 } )
263294 . collect ( ) ;
264295
296+ self . pending_mark_versions_at_sysdb_tasks = output
297+ . versions
298+ . keys ( )
299+ . filter ( |collection_id| !self . soft_deleted_collections_to_gc . contains ( & collection_id) )
300+ . cloned ( )
301+ . collect ( ) ;
302+
265303 for ( collection_id, versions) in & output. versions {
266304 let version_file = self
267305 . version_files
@@ -398,7 +436,7 @@ impl GarbageCollectorOrchestrator {
398436 output. collection_id, output. version
399437 ) ) ) ?;
400438
401- let root = graph
439+ let root_index = graph
402440 . node_indices ( )
403441 . find ( |& n| {
404442 graph
@@ -409,20 +447,26 @@ impl GarbageCollectorOrchestrator {
409447 . ok_or ( GarbageCollectorError :: InvariantViolation (
410448 "Expected to find root node" . to_string ( ) ,
411449 ) ) ?;
412-
413- let versions_from_root_to_this_node =
414- petgraph:: algo:: astar ( graph, root, |finish| finish == this_node, |_| 1 , |_| 0 )
415- . ok_or ( GarbageCollectorError :: InvariantViolation ( format ! (
416- "Expected to find path from root to node for {}@v{}" ,
417- output. collection_id, output. version
418- ) ) ) ?
419- . 1
420- . into_iter ( )
421- . map ( |i| {
422- let node = graph. node_weight ( i) . expect ( "Node should exist" ) ;
423- node. version
424- } )
425- . collect :: < Vec < _ > > ( ) ;
450+ let root = graph. node_weight ( root_index) . expect ( "Node should exist" ) ;
451+
452+ let versions_from_root_to_this_node = petgraph:: algo:: astar (
453+ graph,
454+ root_index,
455+ |finish| finish == this_node,
456+ |_| 1 ,
457+ |_| 0 ,
458+ )
459+ . ok_or ( GarbageCollectorError :: InvariantViolation ( format ! (
460+ "Expected to find path from root ({}@v{}) to node for {}@v{}" ,
461+ root. collection_id, root. version, output. collection_id, output. version
462+ ) ) ) ?
463+ . 1
464+ . into_iter ( )
465+ . map ( |i| {
466+ let node = graph. node_weight ( i) . expect ( "Node should exist" ) ;
467+ node. version
468+ } )
469+ . collect :: < Vec < _ > > ( ) ;
426470 let are_all_versions_v0 = versions_from_root_to_this_node
427471 . iter ( )
428472 . all ( |& version| version == 0 ) ;
@@ -523,14 +567,15 @@ impl GarbageCollectorOrchestrator {
523567 "Expected there to be at least one version file" . to_string ( ) ,
524568 ) ) ?;
525569 // Assumes that all collections in a fork tree are under the same tenant
526- let tenant_id = version_file
527- . collection_info_immutable
528- . as_ref ( )
529- . ok_or ( GarbageCollectorError :: InvariantViolation (
570+ let collection_info = version_file. collection_info_immutable . as_ref ( ) . ok_or (
571+ GarbageCollectorError :: InvariantViolation (
530572 "Expected collection_info_immutable to be set" . to_string ( ) ,
531- ) ) ?
532- . tenant_id
533- . clone ( ) ;
573+ ) ,
574+ ) ?;
575+ let tenant_id = collection_info. tenant_id . clone ( ) ;
576+ self . tenant = Some ( tenant_id. clone ( ) ) ;
577+ let database_name = collection_info. database_name . clone ( ) ;
578+ self . database_name = Some ( database_name. clone ( ) ) ;
534579
535580 let task = wrap (
536581 Box :: new ( DeleteUnusedFilesOperator :: new (
@@ -659,6 +704,91 @@ impl GarbageCollectorOrchestrator {
659704
660705 Ok ( ( ) )
661706 }
707+
708+ async fn handle_delete_versions_output (
709+ & mut self ,
710+ output : DeleteVersionsAtSysDbOutput ,
711+ ctx : & ComponentContext < Self > ,
712+ ) -> Result < ( ) , GarbageCollectorError > {
713+ tracing:: trace!( "Received DeleteVersionsAtSysDbOutput: {:#?}" , output) ;
714+ self . num_versions_deleted += output. versions_to_delete . versions . len ( ) as u32 ;
715+
716+ self . num_pending_tasks -= 1 ;
717+ if self . num_pending_tasks == 0 {
718+ for collection_id in self . soft_deleted_collections_to_gc . drain ( ) {
719+ let graph =
720+ self . graph
721+ . as_ref ( )
722+ . ok_or ( GarbageCollectorError :: InvariantViolation (
723+ "Expected graph to be set" . to_string ( ) ,
724+ ) ) ?;
725+
726+ let first_collection_node = graph
727+ . node_indices ( )
728+ . filter ( |& n| {
729+ let node = graph. node_weight ( n) . expect ( "Node should exist" ) ;
730+ node. collection_id == collection_id
731+ } )
732+ . max_by ( |a, b| {
733+ // todo: does this sort in the right direction?
734+ let a_node = graph. node_weight ( * a) . expect ( "Node should exist" ) ;
735+ let b_node = graph. node_weight ( * b) . expect ( "Node should exist" ) ;
736+ b_node. version . cmp ( & a_node. version )
737+ } )
738+ . ok_or ( GarbageCollectorError :: InvariantViolation ( format ! (
739+ "Expected to find node for collection {}" ,
740+ collection_id
741+ ) ) ) ?;
742+
743+ let first_node = graph
744+ . node_weight ( first_collection_node)
745+ . expect ( "Node should exist" ) ;
746+ tracing:: debug!(
747+ "First node for collection {}: {:#?}" ,
748+ collection_id,
749+ first_node
750+ ) ;
751+
752+ let mut dfs = petgraph:: visit:: Dfs :: new ( graph, first_collection_node) ;
753+ let mut seen_collection_ids: HashSet < CollectionUuid > = HashSet :: new ( ) ;
754+
755+ while let Some ( nx) = dfs. next ( & graph) {
756+ let node = graph. node_weight ( nx) . expect ( "Node should exist" ) ;
757+ seen_collection_ids. insert ( node. collection_id ) ;
758+ }
759+
760+ let is_leaf_node_in_fork_tree =
761+ seen_collection_ids. len ( ) == 1 && seen_collection_ids. contains ( & collection_id) ;
762+
763+ if is_leaf_node_in_fork_tree {
764+ self . sysdb_client
765+ . finish_collection_deletion (
766+ self . tenant . clone ( ) . ok_or (
767+ GarbageCollectorError :: InvariantViolation (
768+ "Expected tenant to be set" . to_string ( ) ,
769+ ) ,
770+ ) ?,
771+ self . database_name . clone ( ) . ok_or (
772+ GarbageCollectorError :: InvariantViolation (
773+ "Expected database to be set" . to_string ( ) ,
774+ ) ,
775+ ) ?,
776+ collection_id,
777+ )
778+ . await ?;
779+ }
780+ }
781+
782+ let response = GarbageCollectorResponse {
783+ num_files_deleted : self . num_files_deleted ,
784+ num_versions_deleted : self . num_versions_deleted ,
785+ } ;
786+
787+ self . terminate_with_result ( Ok ( response) , ctx) . await ;
788+ }
789+
790+ Ok ( ( ) )
791+ }
662792}
663793
664794#[ async_trait]
@@ -774,18 +904,9 @@ impl Handler<TaskResult<DeleteVersionsAtSysDbOutput, DeleteVersionsAtSysDbError>
774904 Some ( output) => output,
775905 None => return ,
776906 } ;
777- tracing:: trace!( "Received DeleteVersionsAtSysDbOutput: {:#?}" , output) ;
778- self . num_versions_deleted += output. versions_to_delete . versions . len ( ) as u32 ;
779-
780- self . num_pending_tasks -= 1 ;
781- if self . num_pending_tasks == 0 {
782- let response = GarbageCollectorResponse {
783- num_files_deleted : self . num_files_deleted ,
784- num_versions_deleted : self . num_versions_deleted ,
785- } ;
786907
787- self . terminate_with_result ( Ok ( response ) , ctx) . await ;
788- }
908+ let res = self . handle_delete_versions_output ( output , ctx) . await ;
909+ self . ok_or_terminate ( res , ctx ) . await ;
789910 }
790911}
791912
0 commit comments