@@ -9,6 +9,10 @@ use crate::operators::delete_unused_files::{
9
9
DeleteUnusedFilesError , DeleteUnusedFilesInput , DeleteUnusedFilesOperator ,
10
10
DeleteUnusedFilesOutput ,
11
11
} ;
12
+ use crate :: operators:: delete_versions_at_sysdb:: {
13
+ DeleteVersionsAtSysDbError , DeleteVersionsAtSysDbInput , DeleteVersionsAtSysDbOperator ,
14
+ DeleteVersionsAtSysDbOutput ,
15
+ } ;
12
16
use crate :: operators:: list_files_at_version:: {
13
17
ListFilesAtVersionError , ListFilesAtVersionInput , ListFilesAtVersionOutput ,
14
18
ListFilesAtVersionsOperator ,
@@ -23,7 +27,7 @@ use chroma_system::{
23
27
wrap, ChannelError , ComponentContext , ComponentHandle , Dispatcher , Handler , Orchestrator ,
24
28
PanicError , System , TaskError , TaskMessage , TaskResult ,
25
29
} ;
26
- use chroma_types:: chroma_proto:: CollectionVersionFile ;
30
+ use chroma_types:: chroma_proto:: { CollectionVersionFile , VersionListForCollection } ;
27
31
use chroma_types:: CollectionUuid ;
28
32
use chrono:: { DateTime , Utc } ;
29
33
use std:: collections:: { HashMap , HashSet } ;
@@ -43,15 +47,12 @@ pub struct GarbageCollectorOrchestrator {
43
47
storage : Storage ,
44
48
root_manager : RootManager ,
45
49
result_channel : Option < Sender < Result < GarbageCollectorResponse , GarbageCollectorError > > > ,
46
- pending_version_file : Option < CollectionVersionFile > ,
47
- pending_versions_to_delete : Option < chroma_types:: chroma_proto:: VersionListForCollection > ,
48
50
pending_epoch_id : Option < i64 > ,
49
- num_versions_deleted : u32 ,
50
- deletion_list : Vec < String > ,
51
51
cleanup_mode : CleanupMode ,
52
52
version_files : HashMap < CollectionUuid , CollectionVersionFile > ,
53
53
versions_to_delete_output : Option < ComputeVersionsToDeleteOutput > ,
54
54
file_ref_counts : HashMap < String , u32 > ,
55
+ num_pending_tasks : usize ,
55
56
}
56
57
57
58
impl Debug for GarbageCollectorOrchestrator {
@@ -60,13 +61,10 @@ impl Debug for GarbageCollectorOrchestrator {
60
61
}
61
62
}
62
63
63
- #[ allow( dead_code) ]
64
64
#[ derive( Debug ) ]
65
65
pub struct GarbageCollectorResponse {
66
- pub collection_id : CollectionUuid ,
67
- pub version_file_path : String ,
68
66
pub num_versions_deleted : u32 ,
69
- pub deletion_list : Vec < String > ,
67
+ pub num_files_deleted : u32 ,
70
68
}
71
69
72
70
#[ allow( clippy:: too_many_arguments) ]
@@ -93,14 +91,11 @@ impl GarbageCollectorOrchestrator {
93
91
root_manager,
94
92
cleanup_mode,
95
93
result_channel : None ,
96
- pending_version_file : None ,
97
- pending_versions_to_delete : None ,
98
94
pending_epoch_id : None ,
99
- num_versions_deleted : 0 ,
100
- deletion_list : Vec :: new ( ) ,
101
95
version_files : HashMap :: new ( ) ,
102
96
file_ref_counts : HashMap :: new ( ) ,
103
97
versions_to_delete_output : None ,
98
+ num_pending_tasks : 0 ,
104
99
}
105
100
}
106
101
}
@@ -126,6 +121,8 @@ pub enum GarbageCollectorError {
126
121
ListFilesAtVersion ( #[ from] ListFilesAtVersionError ) ,
127
122
#[ error( "Failed to delete unused files: {0}" ) ]
128
123
DeleteUnusedFiles ( #[ from] DeleteUnusedFilesError ) ,
124
+ #[ error( "Failed to delete versions at sysdb: {0}" ) ]
125
+ DeleteVersionsAtSysDb ( #[ from] DeleteVersionsAtSysDbError ) ,
129
126
}
130
127
131
128
impl ChromaError for GarbageCollectorError {
@@ -413,5 +410,107 @@ impl Handler<TaskResult<DeleteUnusedFilesOutput, DeleteUnusedFilesError>>
413
410
Some ( output) => output,
414
411
None => return ,
415
412
} ;
413
+
414
+ if self . cleanup_mode == CleanupMode :: DryRun {
415
+ tracing:: info!( "Dry run mode, skipping actual deletion" ) ;
416
+ let response = GarbageCollectorResponse {
417
+ num_versions_deleted : 0 ,
418
+ num_files_deleted : 0 ,
419
+ } ;
420
+ self . terminate_with_result ( Ok ( response) , ctx) . await ;
421
+ return ;
422
+ }
423
+
424
+ // todo: was previously mutated?
425
+ let versions_to_delete = self . versions_to_delete_output . as_ref ( ) . unwrap ( ) ;
426
+
427
+ self . num_pending_tasks += versions_to_delete. versions . len ( ) ;
428
+
429
+ for ( collection_id, versions) in & versions_to_delete. versions {
430
+ let versions_to_delete = versions
431
+ . iter ( )
432
+ . filter_map ( |( version, action) | {
433
+ if * action == CollectionVersionAction :: Delete {
434
+ Some ( * version)
435
+ } else {
436
+ None
437
+ }
438
+ } )
439
+ . collect :: < Vec < _ > > ( ) ;
440
+
441
+ let version_file = self
442
+ . version_files
443
+ . get ( & collection_id)
444
+ . expect ( "Version file should be present" ) ; // todo
445
+
446
+ let delete_versions_task = wrap (
447
+ Box :: new ( DeleteVersionsAtSysDbOperator {
448
+ storage : self . storage . clone ( ) ,
449
+ } ) ,
450
+ DeleteVersionsAtSysDbInput {
451
+ version_file : version_file. clone ( ) ,
452
+ epoch_id : 0 , // todo
453
+ sysdb_client : self . sysdb_client . clone ( ) ,
454
+ versions_to_delete : VersionListForCollection {
455
+ tenant_id : version_file
456
+ . collection_info_immutable
457
+ . as_ref ( )
458
+ . unwrap ( )
459
+ . tenant_id
460
+ . clone ( ) , // todo
461
+ database_id : version_file
462
+ . collection_info_immutable
463
+ . as_ref ( )
464
+ . unwrap ( )
465
+ . database_id
466
+ . clone ( ) , // todo
467
+ collection_id : collection_id. to_string ( ) ,
468
+ versions : versions_to_delete,
469
+ } ,
470
+ unused_s3_files : output. deleted_files . clone ( ) ,
471
+ } ,
472
+ ctx. receiver ( ) ,
473
+ ) ;
474
+
475
+ if let Err ( e) = self
476
+ . dispatcher ( )
477
+ . send ( delete_versions_task, Some ( Span :: current ( ) ) )
478
+ . await
479
+ {
480
+ self . terminate_with_result ( Err ( GarbageCollectorError :: Channel ( e) ) , ctx)
481
+ . await ;
482
+ return ;
483
+ }
484
+ }
485
+ }
486
+ }
487
+
488
+ #[ async_trait]
489
+ impl Handler < TaskResult < DeleteVersionsAtSysDbOutput , DeleteVersionsAtSysDbError > >
490
+ for GarbageCollectorOrchestrator
491
+ {
492
+ type Result = ( ) ;
493
+
494
+ async fn handle (
495
+ & mut self ,
496
+ message : TaskResult < DeleteVersionsAtSysDbOutput , DeleteVersionsAtSysDbError > ,
497
+ ctx : & ComponentContext < GarbageCollectorOrchestrator > ,
498
+ ) {
499
+ // Stage 6: Final stage - versions deleted, complete the garbage collection process
500
+ let _output = match self . ok_or_terminate ( message. into_inner ( ) , ctx) . await {
501
+ Some ( output) => output,
502
+ None => return ,
503
+ } ;
504
+
505
+ self . num_pending_tasks -= 1 ;
506
+ if self . num_pending_tasks == 0 {
507
+ let response = GarbageCollectorResponse {
508
+ // todo
509
+ num_files_deleted : 0 ,
510
+ num_versions_deleted : 0 ,
511
+ } ;
512
+
513
+ self . terminate_with_result ( Ok ( response) , ctx) . await ;
514
+ }
416
515
}
417
516
}
0 commit comments