@@ -170,6 +170,10 @@ pub enum SessionErrorKind {
170170 NoAmendForInitialCommit ,
171171 #[ error( "failed to create manifest from chunk stream" ) ]
172172 ManifestCreationError ( #[ from] Box < SessionError > ) ,
173+ #[ error(
174+ "inconsistent manifests detected: Snapshot will reference {snapshot} manifest, while array nodes will reference {nodes} manifests"
175+ ) ]
176+ ManifestsInconsistencyError { snapshot : usize , nodes : usize } ,
173177 #[ error( "failed to merge sessions: {0}" ) ]
174178 SessionMerge ( String ) ,
175179 #[ error( "byte range {request:?} is out of bounds for chunk of length {chunk_length}" ) ]
@@ -2851,15 +2855,22 @@ async fn do_flush(
28512855 }
28522856
28532857 // manifest_files & manifest_refs _must_ be consistent
2854- debug_assert_eq ! (
2855- flush_data. manifest_files. iter( ) . map( |x| x. id. clone( ) ) . collect:: <HashSet <_>>( ) ,
2856- flush_data
2857- . manifest_refs
2858- . values( )
2859- . flatten( )
2860- . map( |x| x. object_id. clone( ) )
2861- . collect:: <HashSet <_>>( ) ,
2862- ) ;
2858+ let mfiles =
2859+ flush_data. manifest_files . iter ( ) . map ( |x| x. id . clone ( ) ) . collect :: < HashSet < _ > > ( ) ;
2860+ let mrefs = flush_data
2861+ . manifest_refs
2862+ . values ( )
2863+ . flatten ( )
2864+ . map ( |x| x. object_id . clone ( ) )
2865+ . collect :: < HashSet < _ > > ( ) ;
2866+ if mfiles != mrefs {
2867+ return Err ( SessionError :: capture (
2868+ SessionErrorKind :: ManifestsInconsistencyError {
2869+ snapshot : mfiles. len ( ) ,
2870+ nodes : mrefs. len ( ) ,
2871+ } ,
2872+ ) ) ;
2873+ }
28632874
28642875 trace ! ( "Building new snapshot" ) ;
28652876 // gather and sort nodes:
@@ -6660,6 +6671,53 @@ mod tests {
66606671 Ok ( ( ) )
66616672 }
66626673
6674+ #[ tokio_test]
6675+ async fn manifest_files_consistency ( ) -> Result < ( ) , Box < dyn Error > > {
6676+ let repo = Arc :: new ( create_memory_store_repository ( SpecVersionBin :: V2 ) . await ) ;
6677+ let mut session = repo. writable_session ( "main" ) . await ?;
6678+ session
6679+ . add_array ( "/array" . try_into ( ) . unwrap ( ) , basic_shape ( ) , None , Bytes :: new ( ) )
6680+ . await ?;
6681+
6682+ let properties = Default :: default ( ) ;
6683+ let manifest_config = ManifestConfig :: default ( ) ;
6684+ let mut flush_data = FlushProcess :: new (
6685+ Arc :: clone ( & session. asset_manager ) ,
6686+ & session. change_set ,
6687+ & Snapshot :: INITIAL_SNAPSHOT_ID ,
6688+ & manifest_config,
6689+ ) ;
6690+
6691+ // poke at flush_data to make inconsistent
6692+ flush_data. manifest_files . insert ( ManifestFileInfo {
6693+ id : ManifestId :: random ( ) ,
6694+ num_chunk_refs : 0 ,
6695+ size_bytes : 9000 ,
6696+ } ) ;
6697+
6698+ let res = do_flush (
6699+ flush_data,
6700+ "fail" ,
6701+ 1 ,
6702+ properties,
6703+ false ,
6704+ CommitMethod :: NewCommit ,
6705+ manifest_config. splitting ( ) ,
6706+ )
6707+ . await ;
6708+
6709+ // verify it returns the right error
6710+ assert ! ( matches!(
6711+ res,
6712+ Err ( SessionError {
6713+ kind: SessionErrorKind :: ManifestsInconsistencyError { snapshot, nodes } ,
6714+ ..
6715+ } ) if snapshot == 1 && nodes == 0
6716+ ) ) ;
6717+
6718+ Ok ( ( ) )
6719+ }
6720+
66636721 #[ test]
66646722 fn test_construct_valid_byte_range ( ) {
66656723 // chunk at offset 100, length 50 → valid absolute range is [100, 150]
0 commit comments