2
2
3
3
use std:: cmp:: Ordering ;
4
4
use std:: collections:: HashMap ;
5
+ use std:: collections:: HashSet ;
5
6
use std:: future:: Future ;
6
- use std:: sync:: Arc ;
7
+ use std:: sync:: { Arc , Mutex } ;
7
8
use std:: time:: { Duration , Instant , SystemTime } ;
8
9
9
10
use bytes:: Bytes ;
@@ -264,6 +265,7 @@ pub struct Rollup {
264
265
pub advance_to : LogPosition ,
265
266
pub reinsert : Vec < DirtyMarker > ,
266
267
pub compactable : Vec < CollectionInfo > ,
268
+ pub backpressure : Vec < CollectionUuid > ,
267
269
}
268
270
269
271
//////////////////////////////////////// RollupPerCollection ///////////////////////////////////////
@@ -361,6 +363,7 @@ impl DirtyMarker {
361
363
retrieve_cursor : impl Fn ( Arc < Storage > , CollectionUuid ) -> F2 ,
362
364
markers : & [ ( LogPosition , DirtyMarker ) ] ,
363
365
record_count_threshold : u64 ,
366
+ record_count_backpressure : u64 ,
364
367
reinsert_threshold : u64 ,
365
368
timeout_us : u64 ,
366
369
metrics : & Metrics ,
@@ -424,7 +427,6 @@ impl DirtyMarker {
424
427
} )
425
428
. map (
426
429
|( collection_id, storage, retrieve_manifest, retrieve_cursor) | async move {
427
- // We play a funny game of Ok(Ok(_)) to force try_join_all to not short-circuit.
428
430
let cursor = ( * retrieve_cursor) ( Arc :: clone ( & storage) , collection_id) ;
429
431
let manifest = ( * retrieve_manifest) ( Arc :: clone ( & storage) , collection_id) ;
430
432
let ( cursor, manifest) = futures:: future:: join ( cursor, manifest) . await ;
@@ -454,6 +456,7 @@ impl DirtyMarker {
454
456
. flat_map ( Result :: ok)
455
457
. collect :: < HashMap < _ , _ > > ( ) ;
456
458
let mut uncompacted = 0u64 ;
459
+ let mut backpressure = vec ! [ ] ;
457
460
let compactable = compactable
458
461
. into_iter ( )
459
462
. filter_map ( |collection_id| {
@@ -494,6 +497,9 @@ impl DirtyMarker {
494
497
manifest. maximum_log_position( ) ,
495
498
) ;
496
499
uncompacted += maximum_log_position - cursor. position ;
500
+ if maximum_log_position - cursor. position >= record_count_backpressure {
501
+ backpressure. push ( * collection_id) ;
502
+ }
497
503
if maximum_log_position - cursor. position >= record_count_threshold {
498
504
Some ( CollectionInfo {
499
505
collection_id : collection_id. to_string ( ) ,
@@ -542,6 +548,7 @@ impl DirtyMarker {
542
548
advance_to,
543
549
reinsert,
544
550
compactable,
551
+ backpressure,
545
552
} ) )
546
553
}
547
554
@@ -631,10 +638,32 @@ pub struct LogServer {
631
638
open_logs : Arc < StateHashTable < LogKey , LogStub > > ,
632
639
dirty_log : Arc < LogWriter > ,
633
640
compacting : tokio:: sync:: Mutex < ( ) > ,
641
+ backpressure : Mutex < Arc < HashSet < CollectionUuid > > > ,
634
642
cache : Option < Box < dyn chroma_cache:: PersistentCache < String , CachedParquetFragment > > > ,
635
643
metrics : Metrics ,
636
644
}
637
645
646
+ impl LogServer {
647
+ fn set_backpressure ( & self , to_pressure : & [ CollectionUuid ] ) {
648
+ let mut new_backpressure = Arc :: new ( HashSet :: from_iter ( to_pressure. iter ( ) . cloned ( ) ) ) ;
649
+ // SAFETY(rescrv): Mutex poisoning.
650
+ let mut backpressure = self . backpressure . lock ( ) . unwrap ( ) ;
651
+ std:: mem:: swap ( & mut * backpressure, & mut new_backpressure) ;
652
+ }
653
+
654
+ fn check_for_backpressure ( & self , collection_id : CollectionUuid ) -> Result < ( ) , Status > {
655
+ let backpressure = {
656
+ // SAFETY(rescrv): Mutex poisoning.
657
+ let backpressure = self . backpressure . lock ( ) . unwrap ( ) ;
658
+ Arc :: clone ( & backpressure)
659
+ } ;
660
+ if backpressure. contains ( & collection_id) {
661
+ return Err ( Status :: resource_exhausted ( "log needs compaction; too full" ) ) ;
662
+ }
663
+ Ok ( ( ) )
664
+ }
665
+ }
666
+
638
667
#[ async_trait:: async_trait]
639
668
impl LogService for LogServer {
640
669
async fn push_logs (
@@ -652,6 +681,7 @@ impl LogService for LogServer {
652
681
if push_logs. records . is_empty ( ) {
653
682
return Err ( Status :: invalid_argument ( "Too few records" ) ) ;
654
683
}
684
+ self . check_for_backpressure ( collection_id) ?;
655
685
let span = tracing:: info_span!( "push_logs" ) ;
656
686
657
687
async move {
@@ -832,6 +862,7 @@ impl LogService for LogServer {
832
862
let source_collection_id = Uuid :: parse_str ( & request. source_collection_id )
833
863
. map ( CollectionUuid )
834
864
. map_err ( |_| Status :: invalid_argument ( "Failed to parse collection id" ) ) ?;
865
+ self . check_for_backpressure ( source_collection_id) ?;
835
866
let target_collection_id = Uuid :: parse_str ( & request. target_collection_id )
836
867
. map ( CollectionUuid )
837
868
. map_err ( |_| Status :: invalid_argument ( "Failed to parse collection id" ) ) ?;
@@ -1000,6 +1031,7 @@ impl LogService for LogServer {
1000
1031
self . config . record_count_threshold ,
1001
1032
request. min_compaction_size ,
1002
1033
) ,
1034
+ self . config . num_records_before_backpressure ,
1003
1035
self . config . reinsert_threshold ,
1004
1036
self . config . timeout_us ,
1005
1037
& self . metrics ,
@@ -1020,6 +1052,7 @@ impl LogService for LogServer {
1020
1052
. map_err ( |err| Status :: unavailable ( err. to_string ( ) ) )
1021
1053
} )
1022
1054
. collect :: < Result < Vec < _ > , _ > > ( ) ?;
1055
+ self . set_backpressure ( & rollup. backpressure ) ;
1023
1056
if rollup. advance_to < cursor. position {
1024
1057
tracing:: error!(
1025
1058
"advance_to went back in time: {:?} -> {:?}" ,
@@ -1409,6 +1442,8 @@ pub struct LogServerConfig {
1409
1442
pub cache : Option < CacheConfig > ,
1410
1443
#[ serde( default = "LogServerConfig::default_record_count_threshold" ) ]
1411
1444
pub record_count_threshold : u64 ,
1445
+ #[ serde( default = "LogServerConfig::default_num_records_before_backpressure" ) ]
1446
+ pub num_records_before_backpressure : u64 ,
1412
1447
#[ serde( default = "LogServerConfig::default_reinsert_threshold" ) ]
1413
1448
pub reinsert_threshold : u64 ,
1414
1449
#[ serde( default = "LogServerConfig::default_timeout_us" ) ]
@@ -1421,6 +1456,11 @@ impl LogServerConfig {
1421
1456
100
1422
1457
}
1423
1458
1459
+ /// one million records on the log.
1460
+ fn default_num_records_before_backpressure ( ) -> u64 {
1461
+ 1_000_000
1462
+ }
1463
+
1424
1464
/// force compaction if a candidate comes up ten times.
1425
1465
fn default_reinsert_threshold ( ) -> u64 {
1426
1466
10
@@ -1442,6 +1482,7 @@ impl Default for LogServerConfig {
1442
1482
reader : LogReaderOptions :: default ( ) ,
1443
1483
cache : None ,
1444
1484
record_count_threshold : Self :: default_record_count_threshold ( ) ,
1485
+ num_records_before_backpressure : Self :: default_num_records_before_backpressure ( ) ,
1445
1486
reinsert_threshold : Self :: default_reinsert_threshold ( ) ,
1446
1487
timeout_us : Self :: default_timeout_us ( ) ,
1447
1488
}
@@ -1483,12 +1524,14 @@ impl Configurable<LogServerConfig> for LogServer {
1483
1524
let dirty_log = Arc :: new ( dirty_log) ;
1484
1525
let compacting = tokio:: sync:: Mutex :: new ( ( ) ) ;
1485
1526
let metrics = Metrics :: new ( opentelemetry:: global:: meter ( "chroma" ) ) ;
1527
+ let backpressure = Mutex :: new ( Arc :: new ( HashSet :: default ( ) ) ) ;
1486
1528
Ok ( Self {
1487
1529
config : config. clone ( ) ,
1488
1530
open_logs : Arc :: new ( StateHashTable :: default ( ) ) ,
1489
1531
storage,
1490
1532
dirty_log,
1491
1533
compacting,
1534
+ backpressure,
1492
1535
cache,
1493
1536
metrics,
1494
1537
} )
@@ -1592,6 +1635,7 @@ mod tests {
1592
1635
& markers,
1593
1636
1 ,
1594
1637
1 ,
1638
+ 1 ,
1595
1639
86_400_000_000 ,
1596
1640
& Metrics :: new ( opentelemetry:: global:: meter ( "chroma" ) ) ,
1597
1641
)
@@ -1654,6 +1698,7 @@ mod tests {
1654
1698
& markers,
1655
1699
3 ,
1656
1700
1 ,
1701
+ 1 ,
1657
1702
86_400_000_000 ,
1658
1703
& Metrics :: new( opentelemetry:: global:: meter( "chroma" ) ) ,
1659
1704
)
@@ -1738,6 +1783,7 @@ mod tests {
1738
1783
& markers,
1739
1784
3 ,
1740
1785
1 ,
1786
+ 1 ,
1741
1787
86_400_000_000 ,
1742
1788
& Metrics :: new ( opentelemetry:: global:: meter ( "chroma" ) ) ,
1743
1789
)
@@ -1852,6 +1898,7 @@ mod tests {
1852
1898
& markers,
1853
1899
3 ,
1854
1900
1 ,
1901
+ 1 ,
1855
1902
86_400_000_000 ,
1856
1903
& Metrics :: new ( opentelemetry:: global:: meter ( "chroma" ) ) ,
1857
1904
)
@@ -1864,6 +1911,76 @@ mod tests {
1864
1911
assert ! ( rollup. reinsert[ 0 ] . collection_id( ) == collection_id_blocking) ;
1865
1912
}
1866
1913
1914
+ #[ tokio:: test]
1915
+ async fn dirty_marker_backpressure ( ) {
1916
+ // Test that the dirty marker gives proper backpressure.
1917
+ let storage = chroma_storage:: test_storage ( ) ;
1918
+ let now = SystemTime :: now ( )
1919
+ . duration_since ( SystemTime :: UNIX_EPOCH )
1920
+ . map_err ( |_| wal3:: Error :: Internal )
1921
+ . unwrap ( )
1922
+ . as_micros ( ) as u64 ;
1923
+ let collection_id = CollectionUuid :: new ( ) ;
1924
+ let markers = vec ! [ (
1925
+ LogPosition :: from_offset( 1 ) ,
1926
+ DirtyMarker :: MarkDirty {
1927
+ collection_id,
1928
+ log_position: LogPosition :: from_offset( 1 ) ,
1929
+ num_records: 1_000_000 ,
1930
+ reinsert_count: 0 ,
1931
+ initial_insertion_epoch_us: now,
1932
+ } ,
1933
+ ) ] ;
1934
+ let rollup = DirtyMarker :: rollup (
1935
+ Arc :: new ( storage) ,
1936
+ |_, collection_id| async move {
1937
+ if collection_id == collection_id {
1938
+ Ok ( Some ( Manifest {
1939
+ writer : "TODO" . to_string ( ) ,
1940
+ acc_bytes : 0 ,
1941
+ setsum : Setsum :: default ( ) ,
1942
+ snapshots : vec ! [ ] ,
1943
+ fragments : vec ! [ Fragment {
1944
+ seq_no: FragmentSeqNo ( 1 ) ,
1945
+ num_bytes: 0 ,
1946
+ path: "TODO" . to_string( ) ,
1947
+ setsum: Setsum :: default ( ) ,
1948
+ start: LogPosition :: from_offset( 1 ) ,
1949
+ limit: LogPosition :: from_offset( 1_000_001 ) ,
1950
+ } ] ,
1951
+ } ) )
1952
+ } else {
1953
+ unreachable ! ( "we aren't testing this case" ) ;
1954
+ }
1955
+ } ,
1956
+ |_, collection_id| async move {
1957
+ if collection_id == collection_id {
1958
+ Ok ( Some ( Witness :: default_etag_with_cursor ( Cursor {
1959
+ position : LogPosition :: from_offset ( 1 ) ,
1960
+ epoch_us : 0 ,
1961
+ writer : "TODO" . to_string ( ) ,
1962
+ } ) ) )
1963
+ } else {
1964
+ unreachable ! ( "we aren't testing this case" ) ;
1965
+ }
1966
+ } ,
1967
+ & markers,
1968
+ 1 ,
1969
+ 1 ,
1970
+ 1 ,
1971
+ 86_400_000_000 ,
1972
+ & Metrics :: new ( opentelemetry:: global:: meter ( "chroma" ) ) ,
1973
+ )
1974
+ . await
1975
+ . unwrap ( )
1976
+ . unwrap ( ) ;
1977
+ assert_eq ! ( LogPosition :: from_offset( 2 ) , rollup. advance_to) ;
1978
+ assert_eq ! ( 1 , rollup. compactable. len( ) ) ;
1979
+ assert_eq ! ( 1 , rollup. reinsert. len( ) ) ;
1980
+ assert_eq ! ( 1 , rollup. backpressure. len( ) ) ;
1981
+ assert_eq ! ( collection_id, rollup. backpressure[ 0 ] ) ;
1982
+ }
1983
+
1867
1984
#[ test]
1868
1985
fn unsafe_constants ( ) {
1869
1986
assert ! ( STABLE_PREFIX . is_valid( ) ) ;
0 commit comments