@@ -44,6 +44,7 @@ use super::utils::to_records;
44
44
45
45
#[ derive( Debug ) ]
46
46
struct Metrics {
47
+ fork_retries_counter : Counter < u64 > ,
47
48
delete_retries_counter : Counter < u64 > ,
48
49
count_retries_counter : Counter < u64 > ,
49
50
query_retries_counter : Counter < u64 > ,
@@ -73,11 +74,13 @@ impl ServiceBasedFrontend {
73
74
default_knn_index : KnnIndex ,
74
75
) -> Self {
75
76
let meter = global:: meter ( "chroma" ) ;
77
+ let fork_retries_counter = meter. u64_counter ( "fork_retries" ) . build ( ) ;
76
78
let delete_retries_counter = meter. u64_counter ( "delete_retries" ) . build ( ) ;
77
79
let count_retries_counter = meter. u64_counter ( "count_retries" ) . build ( ) ;
78
80
let query_retries_counter = meter. u64_counter ( "query_retries" ) . build ( ) ;
79
81
let get_retries_counter = meter. u64_counter ( "query_retries" ) . build ( ) ;
80
82
let metrics = Arc :: new ( Metrics {
83
+ fork_retries_counter,
81
84
delete_retries_counter,
82
85
count_retries_counter,
83
86
query_retries_counter,
@@ -543,7 +546,7 @@ impl ServiceBasedFrontend {
543
546
Ok ( DeleteCollectionRecordsResponse { } )
544
547
}
545
548
546
- pub async fn fork_collection (
549
+ pub async fn retryable_fork (
547
550
& mut self ,
548
551
ForkCollectionRequest {
549
552
source_collection_id,
@@ -556,6 +559,9 @@ impl ServiceBasedFrontend {
556
559
. sysdb_client
557
560
. fork_collection (
558
561
source_collection_id,
562
+ // TODO: Update this when wiring up log fork
563
+ 0 ,
564
+ 0 ,
559
565
target_collection_id,
560
566
target_collection_name,
561
567
)
@@ -570,6 +576,42 @@ impl ServiceBasedFrontend {
570
576
Ok ( collection)
571
577
}
572
578
579
+ pub async fn fork_collection (
580
+ & mut self ,
581
+ request : ForkCollectionRequest ,
582
+ ) -> Result < ForkCollectionResponse , ForkCollectionError > {
583
+ let retries = Arc :: new ( AtomicUsize :: new ( 0 ) ) ;
584
+ let fork_to_retry = || {
585
+ let mut self_clone = self . clone ( ) ;
586
+ let request_clone = request. clone ( ) ;
587
+ async move { self_clone. retryable_fork ( request_clone) . await }
588
+ } ;
589
+
590
+ let res = fork_to_retry
591
+ . retry ( self . collections_with_segments_provider . get_retry_backoff ( ) )
592
+ // NOTE: Transport level errors will manifest as unknown errors, and they should also be retried
593
+ . when ( |e| {
594
+ matches ! (
595
+ e. code( ) ,
596
+ ErrorCodes :: FailedPrecondition | ErrorCodes :: NotFound | ErrorCodes :: Unknown
597
+ )
598
+ } )
599
+ . notify ( |_, _| {
600
+ let retried = retries. fetch_add ( 1 , Ordering :: Relaxed ) ;
601
+ if retried > 0 {
602
+ tracing:: info!(
603
+ "Retrying fork() request for collection {}" ,
604
+ request. source_collection_id
605
+ ) ;
606
+ }
607
+ } )
608
+ . await ;
609
+ self . metrics
610
+ . fork_retries_counter
611
+ . add ( retries. load ( Ordering :: Relaxed ) as u64 , & [ ] ) ;
612
+ res
613
+ }
614
+
573
615
pub async fn add (
574
616
& mut self ,
575
617
AddCollectionRecordsRequest {
0 commit comments