@@ -933,7 +933,7 @@ pub async fn raise_if_invalid_snapshot_id(
933933#[ cfg( test) ]
934934#[ allow( clippy:: panic, clippy:: unwrap_used, clippy:: expect_used) ]
935935mod tests {
936- use std:: { collections:: HashMap , error:: Error , path:: PathBuf , sync:: Arc } ;
936+ use std:: { collections:: HashMap , error:: Error , iter :: zip , path:: PathBuf , sync:: Arc } ;
937937
938938 use icechunk_macros:: tokio_test;
939939 use itertools:: enumerate;
@@ -947,13 +947,14 @@ mod tests {
947947 ManifestSplitDim , ManifestSplitDimCondition , ManifestSplittingConfig ,
948948 RepositoryConfig ,
949949 } ,
950+ conflicts:: basic_solver:: BasicConflictSolver ,
950951 format:: {
951952 ByteRange , ChunkIndices ,
952953 manifest:: { ChunkPayload , ManifestSplits } ,
953954 snapshot:: { ArrayShape , DimensionName } ,
954955 } ,
955956 new_local_filesystem_storage,
956- session:: get_chunk,
957+ session:: { SessionError , get_chunk} ,
957958 storage:: new_in_memory_storage,
958959 } ;
959960
@@ -1887,6 +1888,163 @@ mod tests {
18871888 let expected = Bytes :: copy_from_slice ( format ! ( "{0}" , val) . as_bytes ( ) ) ;
18881889 assert_eq ! ( actual, expected) ;
18891890 }
1891+
1892+ // now merge two sessions: one with only writes, one with only deletes
1893+ let mut session1 = repository. writable_session ( "main" ) . await ?;
1894+ session1
1895+ . set_chunk_ref (
1896+ temp_path. clone ( ) ,
1897+ ChunkIndices ( indices[ 0 ] . clone ( ) ) ,
1898+ Some ( ChunkPayload :: Inline ( format ! ( "{0}" , 3 ) . into ( ) ) ) ,
1899+ )
1900+ . await ?;
1901+ session1
1902+ . set_chunk_ref (
1903+ temp_path. clone ( ) ,
1904+ ChunkIndices ( indices[ 1 ] . clone ( ) ) ,
1905+ Some ( ChunkPayload :: Inline ( format ! ( "{0}" , 4 ) . into ( ) ) ) ,
1906+ )
1907+ . await ?;
1908+ let mut session2 = repository. writable_session ( "main" ) . await ?;
1909+ session2
1910+ . set_chunk_ref ( temp_path. clone ( ) , ChunkIndices ( indices[ 2 ] . clone ( ) ) , None )
1911+ . await ?;
1912+ session2
1913+ . set_chunk_ref ( temp_path. clone ( ) , ChunkIndices ( indices[ 3 ] . clone ( ) ) , None )
1914+ . await ?;
1915+
1916+ session1. merge ( session2) . await ?;
1917+ let expected = vec ! [ Some ( 3 ) , Some ( 4 ) , None , None ] ;
1918+ for ( expect, idx) in zip ( expected. iter ( ) , indices. iter ( ) ) {
1919+ let actual = get_chunk (
1920+ session1
1921+ . get_chunk_reader (
1922+ & temp_path,
1923+ & ChunkIndices ( idx. clone ( ) ) ,
1924+ & ByteRange :: ALL ,
1925+ )
1926+ . await
1927+ . unwrap ( ) ,
1928+ )
1929+ . await
1930+ . unwrap ( ) ;
1931+ let expected_value =
1932+ expect. map ( |val| Bytes :: copy_from_slice ( format ! ( "{0}" , val) . as_bytes ( ) ) ) ;
1933+ assert_eq ! ( actual, expected_value) ;
1934+ }
1935+
1936+ Ok ( ( ) )
1937+ }
1938+
1939+ #[ tokio_test]
1940+ async fn test_commits_with_conflicting_manifest_splits ( ) -> Result < ( ) , Box < dyn Error > >
1941+ {
1942+ let shape = ArrayShape :: new ( vec ! [ ( 25 , 1 ) , ( 10 , 1 ) , ( 3 , 1 ) , ( 4 , 1 ) ] ) . unwrap ( ) ;
1943+ let dimension_names = Some ( vec ! [ "t" . into( ) , "z" . into( ) , "y" . into( ) , "x" . into( ) ] ) ;
1944+ let temp_path: Path = "/temperature" . try_into ( ) . unwrap ( ) ;
1945+
1946+ let orig_split_sizes = vec ! [ (
1947+ ManifestSplitCondition :: AnyArray ,
1948+ vec![ ManifestSplitDim {
1949+ condition: ManifestSplitDimCondition :: DimensionName ( "t" . to_string( ) ) ,
1950+ num_chunks: 12u32 ,
1951+ } ] ,
1952+ ) ] ;
1953+ let split_config =
1954+ ManifestSplittingConfig { split_sizes : Some ( orig_split_sizes. clone ( ) ) } ;
1955+ let backend: Arc < dyn Storage + Send + Sync > = new_in_memory_storage ( ) . await ?;
1956+ let repository = create_repo_with_split_manifest_config (
1957+ & temp_path,
1958+ & shape,
1959+ & dimension_names,
1960+ & split_config,
1961+ Some ( backend) ,
1962+ )
1963+ . await ?;
1964+
1965+ let indices =
1966+ vec ! [ vec![ 0 , 0 , 1 , 0 ] , vec![ 0 , 0 , 0 , 0 ] , vec![ 0 , 2 , 0 , 0 ] , vec![ 0 , 2 , 0 , 1 ] ] ;
1967+
1968+ let mut session1 = repository. writable_session ( "main" ) . await ?;
1969+ session1
1970+ . set_chunk_ref (
1971+ temp_path. clone ( ) ,
1972+ ChunkIndices ( indices[ 0 ] . clone ( ) ) ,
1973+ Some ( ChunkPayload :: Inline ( format ! ( "{0}" , 0 ) . into ( ) ) ) ,
1974+ )
1975+ . await ?;
1976+ session1
1977+ . set_chunk_ref (
1978+ temp_path. clone ( ) ,
1979+ ChunkIndices ( indices[ 1 ] . clone ( ) ) ,
1980+ Some ( ChunkPayload :: Inline ( format ! ( "{0}" , 1 ) . into ( ) ) ) ,
1981+ )
1982+ . await ?;
1983+
1984+ let incompatible_size = 11u32 ;
1985+ let incompatible_split_sizes = vec ! [ (
1986+ ManifestSplitCondition :: AnyArray ,
1987+ vec![ ManifestSplitDim {
1988+ condition: ManifestSplitDimCondition :: DimensionName ( "t" . to_string( ) ) ,
1989+ num_chunks: incompatible_size,
1990+ } ] ,
1991+ ) ] ;
1992+ let other_repo = reopen_repo_with_new_splitting_config (
1993+ & repository,
1994+ Some ( incompatible_split_sizes) ,
1995+ ) ;
1996+
1997+ assert_ne ! ( other_repo. config( ) , repository. config( ) ) ;
1998+
1999+ let mut session2 = other_repo. writable_session ( "main" ) . await ?;
2000+ session2
2001+ . set_chunk_ref (
2002+ temp_path. clone ( ) ,
2003+ ChunkIndices ( indices[ 2 ] . clone ( ) ) ,
2004+ Some ( ChunkPayload :: Inline ( format ! ( "{0}" , 2 ) . into ( ) ) ) ,
2005+ )
2006+ . await ?;
2007+ session2
2008+ . set_chunk_ref (
2009+ temp_path. clone ( ) ,
2010+ ChunkIndices ( indices[ 3 ] . clone ( ) ) ,
2011+ Some ( ChunkPayload :: Inline ( format ! ( "{0}" , 3 ) . into ( ) ) ) ,
2012+ )
2013+ . await ?;
2014+
2015+ session1. commit ( "first commit" , None ) . await ?;
2016+ if let Err ( SessionError { kind : SessionErrorKind :: Conflict { .. } , .. } ) =
2017+ session2. commit ( "second commit" , None ) . await
2018+ {
2019+ let solver = BasicConflictSolver :: default ( ) ;
2020+ // different chunks were written so this should fast forward
2021+ assert ! ( session2. rebase( & solver) . await . is_ok( ) ) ;
2022+ session2. commit ( "second commit after rebase" , None ) . await ?;
2023+ } else {
2024+ panic ! ( "this should have conflicted!" ) ;
2025+ }
2026+
2027+ let new_session = repository
2028+ . readonly_session ( & VersionInfo :: BranchTipRef ( "main" . into ( ) ) )
2029+ . await ?;
2030+ for ( val, idx) in enumerate ( indices. iter ( ) ) {
2031+ let actual = get_chunk (
2032+ new_session
2033+ . get_chunk_reader (
2034+ & temp_path,
2035+ & ChunkIndices ( idx. clone ( ) ) ,
2036+ & ByteRange :: ALL ,
2037+ )
2038+ . await
2039+ . unwrap ( ) ,
2040+ )
2041+ . await
2042+ . unwrap ( )
2043+ . expect ( & format ! ( "getting chunk ref failed for {:?}" , & idx) ) ;
2044+ let expected = Bytes :: copy_from_slice ( format ! ( "{0}" , val) . as_bytes ( ) ) ;
2045+ assert_eq ! ( actual, expected) ;
2046+ }
2047+
18902048 Ok ( ( ) )
18912049 }
18922050
0 commit comments