@@ -9,6 +9,10 @@ use crate::operators::delete_unused_files::{
99 DeleteUnusedFilesError , DeleteUnusedFilesInput , DeleteUnusedFilesOperator ,
1010 DeleteUnusedFilesOutput ,
1111} ;
12+ use crate :: operators:: delete_versions_at_sysdb:: {
13+ DeleteVersionsAtSysDbError , DeleteVersionsAtSysDbInput , DeleteVersionsAtSysDbOperator ,
14+ DeleteVersionsAtSysDbOutput ,
15+ } ;
1216use crate :: operators:: list_files_at_version:: {
1317 ListFilesAtVersionError , ListFilesAtVersionInput , ListFilesAtVersionOutput ,
1418 ListFilesAtVersionsOperator ,
@@ -23,7 +27,7 @@ use chroma_system::{
2327 wrap, ChannelError , ComponentContext , ComponentHandle , Dispatcher , Handler , Orchestrator ,
2428 PanicError , System , TaskError , TaskMessage , TaskResult ,
2529} ;
26- use chroma_types:: chroma_proto:: CollectionVersionFile ;
30+ use chroma_types:: chroma_proto:: { CollectionVersionFile , VersionListForCollection } ;
2731use chroma_types:: CollectionUuid ;
2832use chrono:: { DateTime , Utc } ;
2933use std:: collections:: { HashMap , HashSet } ;
@@ -43,15 +47,12 @@ pub struct GarbageCollectorOrchestrator {
4347 storage : Storage ,
4448 root_manager : RootManager ,
4549 result_channel : Option < Sender < Result < GarbageCollectorResponse , GarbageCollectorError > > > ,
46- pending_version_file : Option < CollectionVersionFile > ,
47- pending_versions_to_delete : Option < chroma_types:: chroma_proto:: VersionListForCollection > ,
4850 pending_epoch_id : Option < i64 > ,
49- num_versions_deleted : u32 ,
50- deletion_list : Vec < String > ,
5151 cleanup_mode : CleanupMode ,
5252 version_files : HashMap < CollectionUuid , CollectionVersionFile > ,
5353 versions_to_delete_output : Option < ComputeVersionsToDeleteOutput > ,
5454 file_ref_counts : HashMap < String , u32 > ,
55+ num_pending_tasks : usize ,
5556}
5657
5758impl Debug for GarbageCollectorOrchestrator {
@@ -60,13 +61,10 @@ impl Debug for GarbageCollectorOrchestrator {
6061 }
6162}
6263
63- #[ allow( dead_code) ]
6464#[ derive( Debug ) ]
6565pub struct GarbageCollectorResponse {
66- pub collection_id : CollectionUuid ,
67- pub version_file_path : String ,
6866 pub num_versions_deleted : u32 ,
69- pub deletion_list : Vec < String > ,
67+ pub num_files_deleted : u32 ,
7068}
7169
7270#[ allow( clippy:: too_many_arguments) ]
@@ -93,14 +91,11 @@ impl GarbageCollectorOrchestrator {
9391 root_manager,
9492 cleanup_mode,
9593 result_channel : None ,
96- pending_version_file : None ,
97- pending_versions_to_delete : None ,
9894 pending_epoch_id : None ,
99- num_versions_deleted : 0 ,
100- deletion_list : Vec :: new ( ) ,
10195 version_files : HashMap :: new ( ) ,
10296 file_ref_counts : HashMap :: new ( ) ,
10397 versions_to_delete_output : None ,
98+ num_pending_tasks : 0 ,
10499 }
105100 }
106101}
@@ -126,6 +121,8 @@ pub enum GarbageCollectorError {
126121 ListFilesAtVersion ( #[ from] ListFilesAtVersionError ) ,
127122 #[ error( "Failed to delete unused files: {0}" ) ]
128123 DeleteUnusedFiles ( #[ from] DeleteUnusedFilesError ) ,
124+ #[ error( "Failed to delete versions at sysdb: {0}" ) ]
125+ DeleteVersionsAtSysDb ( #[ from] DeleteVersionsAtSysDbError ) ,
129126}
130127
131128impl ChromaError for GarbageCollectorError {
@@ -413,5 +410,107 @@ impl Handler<TaskResult<DeleteUnusedFilesOutput, DeleteUnusedFilesError>>
413410 Some ( output) => output,
414411 None => return ,
415412 } ;
413+
414+ if self . cleanup_mode == CleanupMode :: DryRun {
415+ tracing:: info!( "Dry run mode, skipping actual deletion" ) ;
416+ let response = GarbageCollectorResponse {
417+ num_versions_deleted : 0 ,
418+ num_files_deleted : 0 ,
419+ } ;
420+ self . terminate_with_result ( Ok ( response) , ctx) . await ;
421+ return ;
422+ }
423+
424+ // todo: was previously mutated?
425+ let versions_to_delete = self . versions_to_delete_output . as_ref ( ) . unwrap ( ) ;
426+
427+ self . num_pending_tasks += versions_to_delete. versions . len ( ) ;
428+
429+ for ( collection_id, versions) in & versions_to_delete. versions {
430+ let versions_to_delete = versions
431+ . iter ( )
432+ . filter_map ( |( version, action) | {
433+ if * action == CollectionVersionAction :: Delete {
434+ Some ( * version)
435+ } else {
436+ None
437+ }
438+ } )
439+ . collect :: < Vec < _ > > ( ) ;
440+
441+ let version_file = self
442+ . version_files
443+ . get ( & collection_id)
444+ . expect ( "Version file should be present" ) ; // todo
445+
446+ let delete_versions_task = wrap (
447+ Box :: new ( DeleteVersionsAtSysDbOperator {
448+ storage : self . storage . clone ( ) ,
449+ } ) ,
450+ DeleteVersionsAtSysDbInput {
451+ version_file : version_file. clone ( ) ,
452+ epoch_id : 0 , // todo
453+ sysdb_client : self . sysdb_client . clone ( ) ,
454+ versions_to_delete : VersionListForCollection {
455+ tenant_id : version_file
456+ . collection_info_immutable
457+ . as_ref ( )
458+ . unwrap ( )
459+ . tenant_id
460+ . clone ( ) , // todo
461+ database_id : version_file
462+ . collection_info_immutable
463+ . as_ref ( )
464+ . unwrap ( )
465+ . database_id
466+ . clone ( ) , // todo
467+ collection_id : collection_id. to_string ( ) ,
468+ versions : versions_to_delete,
469+ } ,
470+ unused_s3_files : output. deleted_files . clone ( ) ,
471+ } ,
472+ ctx. receiver ( ) ,
473+ ) ;
474+
475+ if let Err ( e) = self
476+ . dispatcher ( )
477+ . send ( delete_versions_task, Some ( Span :: current ( ) ) )
478+ . await
479+ {
480+ self . terminate_with_result ( Err ( GarbageCollectorError :: Channel ( e) ) , ctx)
481+ . await ;
482+ return ;
483+ }
484+ }
485+ }
486+ }
487+
488+ #[ async_trait]
489+ impl Handler < TaskResult < DeleteVersionsAtSysDbOutput , DeleteVersionsAtSysDbError > >
490+ for GarbageCollectorOrchestrator
491+ {
492+ type Result = ( ) ;
493+
494+ async fn handle (
495+ & mut self ,
496+ message : TaskResult < DeleteVersionsAtSysDbOutput , DeleteVersionsAtSysDbError > ,
497+ ctx : & ComponentContext < GarbageCollectorOrchestrator > ,
498+ ) {
499+ // Stage 6: Final stage - versions deleted, complete the garbage collection process
500+ let _output = match self . ok_or_terminate ( message. into_inner ( ) , ctx) . await {
501+ Some ( output) => output,
502+ None => return ,
503+ } ;
504+
505+ self . num_pending_tasks -= 1 ;
506+ if self . num_pending_tasks == 0 {
507+ let response = GarbageCollectorResponse {
508+ // todo
509+ num_files_deleted : 0 ,
510+ num_versions_deleted : 0 ,
511+ } ;
512+
513+ self . terminate_with_result ( Ok ( response) , ctx) . await ;
514+ }
416515 }
417516}
0 commit comments