@@ -13,10 +13,10 @@ use chroma_storage::config::StorageConfig;
13
13
use chroma_storage:: Storage ;
14
14
use chroma_types:: chroma_proto:: {
15
15
log_service_server:: LogService , CollectionInfo , GetAllCollectionInfoToCompactRequest ,
16
- GetAllCollectionInfoToCompactResponse , LogRecord , OperationRecord , PullLogsRequest ,
17
- PullLogsResponse , PurgeDirtyForCollectionRequest , PurgeDirtyForCollectionResponse ,
18
- PushLogsRequest , PushLogsResponse , ScoutLogsRequest , ScoutLogsResponse ,
19
- UpdateCollectionLogOffsetRequest , UpdateCollectionLogOffsetResponse ,
16
+ GetAllCollectionInfoToCompactResponse , InspectDirtyLogRequest , InspectDirtyLogResponse ,
17
+ LogRecord , OperationRecord , PullLogsRequest , PullLogsResponse , PurgeDirtyForCollectionRequest ,
18
+ PurgeDirtyForCollectionResponse , PushLogsRequest , PushLogsResponse , ScoutLogsRequest ,
19
+ ScoutLogsResponse , UpdateCollectionLogOffsetRequest , UpdateCollectionLogOffsetResponse ,
20
20
} ;
21
21
use chroma_types:: chroma_proto:: { ForkLogsRequest , ForkLogsResponse } ;
22
22
use chroma_types:: CollectionUuid ;
@@ -1080,6 +1080,65 @@ impl LogService for LogServer {
1080
1080
. map_err ( |err| Status :: new ( err. code ( ) . into ( ) , err. to_string ( ) ) ) ?;
1081
1081
Ok ( Response :: new ( PurgeDirtyForCollectionResponse { } ) )
1082
1082
}
1083
+
1084
+ #[ tracing:: instrument( skip( self , _request) ) ]
1085
+ async fn inspect_dirty_log (
1086
+ & self ,
1087
+ _request : Request < InspectDirtyLogRequest > ,
1088
+ ) -> Result < Response < InspectDirtyLogResponse > , Status > {
1089
+ let Some ( reader) = self . dirty_log . reader ( LogReaderOptions :: default ( ) ) else {
1090
+ return Err ( Status :: unavailable ( "Failed to get dirty log reader" ) ) ;
1091
+ } ;
1092
+ let Some ( cursors) = self . dirty_log . cursors ( CursorStoreOptions :: default ( ) ) else {
1093
+ return Err ( Status :: unavailable ( "Failed to get dirty log cursors" ) ) ;
1094
+ } ;
1095
+ let witness = match cursors. load ( & STABLE_PREFIX ) . await {
1096
+ Ok ( witness) => witness,
1097
+ Err ( err) => {
1098
+ return Err ( Status :: new ( err. code ( ) . into ( ) , err. to_string ( ) ) ) ;
1099
+ }
1100
+ } ;
1101
+ let default = Cursor :: default ( ) ;
1102
+ let cursor = witness. as_ref ( ) . map ( |w| w. cursor ( ) ) . unwrap_or ( & default) ;
1103
+ tracing:: info!( "cursoring from {cursor:?}" ) ;
1104
+ let dirty_fragments = reader
1105
+ . scan (
1106
+ cursor. position ,
1107
+ Limits {
1108
+ max_files : Some ( 1_000_000 ) ,
1109
+ max_bytes : Some ( 1_000_000_000 ) ,
1110
+ } ,
1111
+ )
1112
+ . await
1113
+ . map_err ( |err| Status :: new ( err. code ( ) . into ( ) , err. to_string ( ) ) ) ?;
1114
+ let dirty_futures = dirty_fragments
1115
+ . iter ( )
1116
+ . map ( |fragment| reader. read_parquet ( fragment) )
1117
+ . collect :: < Vec < _ > > ( ) ;
1118
+ let dirty_raw = futures:: future:: try_join_all ( dirty_futures)
1119
+ . await
1120
+ . map_err ( |err| {
1121
+ Status :: new (
1122
+ err. code ( ) . into ( ) ,
1123
+ format ! ( "Failed to fetch dirty parquet: {}" , err) ,
1124
+ )
1125
+ } ) ?;
1126
+ let mut markers = vec ! [ ] ;
1127
+ for ( _, records, _) in dirty_raw {
1128
+ let records = records
1129
+ . into_iter ( )
1130
+ . map ( |x| String :: from_utf8 ( x. 1 ) )
1131
+ . collect :: < Result < Vec < _ > , _ > > ( )
1132
+ . map_err ( |err| {
1133
+ Status :: new (
1134
+ chroma_error:: ErrorCodes :: DataLoss . into ( ) ,
1135
+ format ! ( "Failed to extract records: {}" , err) ,
1136
+ )
1137
+ } ) ?;
1138
+ markers. extend ( records) ;
1139
+ }
1140
+ Ok ( Response :: new ( InspectDirtyLogResponse { markers } ) )
1141
+ }
1083
1142
}
1084
1143
1085
1144
fn parquet_to_records ( parquet : Arc < Vec < u8 > > ) -> Result < Vec < ( LogPosition , Vec < u8 > ) > , Status > {
0 commit comments