Skip to content

Commit 660fa42

Browse files
committed
Add test for splits changing in a session
1 parent cad2766 commit 660fa42

2 files changed

Lines changed: 130 additions & 4 deletions

File tree

icechunk/src/repository.rs

Lines changed: 124 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1262,7 +1262,8 @@ mod tests {
12621262
assert_manifest_count(&storage, 1).await;
12631263

12641264
// Important we are not issuing any chunk deletes here (which is what Zarr does)
1265-
// Note we are still rewriting the manifest
1265+
// Note we are still rewriting the manifest even without chunk changes
1266+
// GH604
12661267
let mut session = repo.writable_session("main").await?;
12671268
let shape2 = ArrayShape::new(vec![(2, 1)]).unwrap();
12681269
session
@@ -1291,12 +1292,132 @@ mod tests {
12911292
session.commit("second commit", None).await?;
12921293
assert_manifest_count(&storage, 2).await;
12931294

1294-
// FIXME: add more complex splitting test
1295+
Ok(())
1296+
}
1297+
1298+
#[tokio_test]
1299+
async fn test_splits_change_in_session() -> Result<(), Box<dyn Error>> {
1300+
let shape = ArrayShape::new(vec![(13, 1), (2, 1), (1, 1)]).unwrap();
1301+
let dimension_names = Some(vec!["t".into(), "y".into(), "x".into()]);
1302+
let new_dimension_names = Some(vec!["time".into(), "y".into(), "x".into()]);
1303+
let array_path: Path = "/temperature".try_into().unwrap();
1304+
let array_def = Bytes::from_static(br#"{"this":"other array"}"#);
1305+
1306+
// two possible split sizes t: 3, time: 4;
1307+
// then we rename `t` to `time` 😈
1308+
let split_sizes = vec![
1309+
(
1310+
ManifestSplitCondition::PathMatches { regex: r".*".to_string() },
1311+
vec![ManifestSplitDim {
1312+
condition: ManifestSplitDimCondition::DimensionName(
1313+
"^t$".to_string(),
1314+
),
1315+
num_chunks: 3,
1316+
}],
1317+
),
1318+
(
1319+
ManifestSplitCondition::PathMatches { regex: r".*".to_string() },
1320+
vec![ManifestSplitDim {
1321+
condition: ManifestSplitDimCondition::DimensionName(
1322+
"time".to_string(),
1323+
),
1324+
num_chunks: 4,
1325+
}],
1326+
),
1327+
];
1328+
let split_config = ManifestSplittingConfig { split_sizes: Some(split_sizes) };
1329+
1330+
let backend: Arc<dyn Storage + Send + Sync> = new_in_memory_storage().await?;
1331+
let logging = Arc::new(LoggingStorage::new(Arc::clone(&backend)));
1332+
let storage: Arc<dyn Storage + Send + Sync> = logging.clone();
1333+
let repository = create_repo_with_split_manifest_config(
1334+
&array_path,
1335+
&shape,
1336+
&dimension_names,
1337+
&split_config,
1338+
Some(Arc::clone(&storage)),
1339+
)
1340+
.await?;
1341+
1342+
let verify_data = async |session: &Session, offset: u32| {
1343+
for idx in 0..12 {
1344+
let actual = get_chunk(
1345+
session
1346+
.get_chunk_reader(
1347+
&array_path,
1348+
&ChunkIndices(vec![idx.clone(), 0, 0]),
1349+
&ByteRange::ALL,
1350+
)
1351+
.await
1352+
.unwrap(),
1353+
)
1354+
.await
1355+
.unwrap()
1356+
.unwrap();
1357+
let expected = Bytes::copy_from_slice(format!("{0}", idx + offset).as_bytes());
1358+
assert_eq!(actual, expected);
1359+
}
1360+
};
1361+
1362+
let mut session = repository.writable_session("main").await?;
1363+
for i in 0..12 {
1364+
session
1365+
.set_chunk_ref(
1366+
array_path.clone(),
1367+
ChunkIndices(vec![i, 0, 0]),
1368+
Some(ChunkPayload::Inline(format!("{0}", i).into())),
1369+
)
1370+
.await?
1371+
}
1372+
verify_data(&session, 0).await;
1373+
1374+
let node = session.get_node(&array_path).await?;
1375+
let orig_splits = session.lookup_splits(&node.id).cloned();
1376+
assert_eq!(
1377+
orig_splits,
1378+
Some(ManifestSplits::from_edges(vec![
1379+
vec![0, 3, 6, 9, 12, 13],
1380+
vec![0, 2],
1381+
vec![0, 1]
1382+
]))
1383+
);
1384+
1385+
// this should update the splits
1386+
session
1387+
.update_array(
1388+
&array_path,
1389+
shape.clone(),
1390+
new_dimension_names.clone(),
1391+
array_def.clone(),
1392+
)
1393+
.await?;
1394+
verify_data(&session, 0).await;
1395+
let new_splits = session.lookup_splits(&node.id).cloned();
1396+
assert_eq!(
1397+
new_splits,
1398+
Some(ManifestSplits::from_edges(vec![
1399+
vec![0, 4, 8, 12, 13],
1400+
vec![0, 2],
1401+
vec![0, 1]
1402+
]))
1403+
);
1404+
1405+
// update data
1406+
for i in 0..12 {
1407+
session
1408+
.set_chunk_ref(
1409+
array_path.clone(),
1410+
ChunkIndices(vec![i, 0, 0]),
1411+
Some(ChunkPayload::Inline(format!("{0}", i+10).into())),
1412+
)
1413+
.await?
1414+
}
1415+
verify_data(&session, 10).await;
12951416

12961417
Ok(())
12971418
}
12981419

1299-
#[tokio::test]
1420+
#[tokio_test]
13001421
async fn tests_manifest_splitting_simple() -> Result<(), Box<dyn Error>> {
13011422
let dim_size = 25u32;
13021423
let chunk_size = 1u32;

icechunk/src/session.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -500,16 +500,20 @@ impl Session {
500500
self.set_node_chunk_ref(node_snapshot, coord, data).await
501501
}
502502

503+
pub fn lookup_splits(&self, node_id: &NodeId) -> Option<&ManifestSplits> {
504+
self.splits.get(node_id)
505+
}
506+
503507
fn cache_splits(
504508
&mut self,
505509
node_id: &NodeId,
506510
path: &Path,
507511
shape: &ArrayShape,
508512
dimension_names: &Option<Vec<DimensionName>>,
509513
) {
510-
// FIXME: handle conflicts here
511514
// Q: What happens if we set a chunk, then change a dimension name, so
512515
// that the split changes.
516+
// A: we reorg the existing chunk refs in the changeset to the new splits.
513517
let splitting = self.config.manifest().splitting();
514518
let splits = splitting.get_split_sizes(path, shape, dimension_names);
515519
self.splits.insert(node_id.clone(), splits);
@@ -1842,6 +1846,7 @@ impl ManifestSplittingConfig {
18421846
} in dim_specs.iter()
18431847
{
18441848
if dim_condition.matches(axis, dimname.clone().into()) {
1849+
dbg!(&dim_condition, "matched", &dimname);
18451850
edges[axis] = uniform_manifest_split_edges(
18461851
num_chunks[axis],
18471852
split_size,

0 commit comments

Comments
 (0)