Skip to content

Commit 1da89b9

Browse files
committed
feat: refactor TraversalCache to remove synchronous append and take methods, replacing them with stream-based alternatives for improved performance and memory efficiency
1 parent 57980fb commit 1da89b9

1 file changed

Lines changed: 37 additions & 131 deletions

File tree

versatiles_container/src/cache/traversal_cache.rs

Lines changed: 37 additions & 131 deletions
Original file line numberDiff line numberDiff line change
@@ -102,33 +102,6 @@ impl<V: CacheValue> TraversalCache<V> {
102102
})
103103
}
104104

105-
/// Append values to the cache entry at `index`.
106-
///
107-
/// In Disk mode, writes all values to a single new file.
108-
#[context("Failed to append to traversal cache at index {}", index)]
109-
pub fn append(&self, index: usize, values: Vec<V>) -> Result<()> {
110-
match self {
111-
Self::Memory(map) => {
112-
map.entry(index).or_default().extend(values);
113-
Ok(())
114-
}
115-
Self::Disk {
116-
path,
117-
next_writer_id,
118-
file_index,
119-
..
120-
} => {
121-
let (mut writer, file_path) = Self::create_cache_file(path, index, next_writer_id)?;
122-
for value in &values {
123-
value.write_to_cache(&mut writer)?;
124-
}
125-
writer.flush()?;
126-
file_index.entry(index).or_default().push(file_path);
127-
Ok(())
128-
}
129-
}
130-
}
131-
132105
/// Append values from a stream to the cache entry at `index`.
133106
///
134107
/// Values are consumed one at a time, so peak memory usage is independent
@@ -163,30 +136,6 @@ impl<V: CacheValue> TraversalCache<V> {
163136
}
164137
}
165138

166-
/// Take and return all values at `index`, removing them from the cache.
167-
///
168-
/// In Disk mode, reads all files for this index sequentially and collects
169-
/// all values into a single `Vec`. Returns `Ok(None)` if no entry exists.
170-
#[context("Failed to take from traversal cache at index {}", index)]
171-
pub fn take(&self, index: usize) -> Result<Option<Vec<V>>> {
172-
match self {
173-
Self::Memory(map) => Ok(map.remove(&index).map(|(_, v)| v)),
174-
Self::Disk { path, file_index, .. } => {
175-
let Some(files) = Self::take_index_files(file_index, index) else {
176-
return Ok(None);
177-
};
178-
let mut values = Vec::new();
179-
for file_path in &files {
180-
for result in Self::iter_values_from_file(file_path)? {
181-
values.push(result?);
182-
}
183-
}
184-
remove_dir_all(path.join(index.to_string())).ok();
185-
Ok(Some(values))
186-
}
187-
}
188-
}
189-
190139
/// Take all values at `index` as a stream, removing them from the cache.
191140
///
192141
/// In Disk mode, files are read concurrently on blocking threads and
@@ -330,7 +279,10 @@ mod tests {
330279
#[rstest]
331280
#[case::mem("mem")]
332281
#[case::disk("disk")]
333-
fn test_append_and_take(#[case] case: &str) -> Result<()> {
282+
#[tokio::test]
283+
async fn test_append_and_take_stream(#[case] case: &str) -> Result<()> {
284+
use futures::StreamExt;
285+
334286
let cache_type = match case {
335287
"mem" => CacheType::InMemory,
336288
"disk" => CacheType::Disk(TempDir::new()?.path().to_path_buf()),
@@ -339,80 +291,40 @@ mod tests {
339291
let cache = TraversalCache::<String>::new(&cache_type)?;
340292

341293
// Initially empty
342-
assert_eq!(cache.take(0)?, None);
343-
assert_eq!(cache.take(1)?, None);
294+
assert!(cache.take_stream(0)?.is_none());
295+
assert!(cache.take_stream(1)?.is_none());
344296

345-
// Append to index 0
346-
cache.append(0, vec!["a".to_string(), "b".to_string()])?;
297+
// Append via stream to index 0
298+
cache
299+
.append_stream(0, futures::stream::iter(vec!["a".to_string(), "b".to_string()]))
300+
.await?;
347301

348-
// Append more to index 0
349-
cache.append(0, vec!["c".to_string()])?;
302+
// Append more via stream to same index
303+
cache
304+
.append_stream(0, futures::stream::iter(vec!["c".to_string()]))
305+
.await?;
350306

351307
// Append to different index
352-
cache.append(1, vec!["x".to_string()])?;
308+
cache
309+
.append_stream(1, futures::stream::iter(vec!["x".to_string()]))
310+
.await?;
353311

354-
// Take preserves order
355-
assert_eq!(
356-
cache.take(0)?,
357-
Some(vec!["a".to_string(), "b".to_string(), "c".to_string()])
358-
);
312+
// take_stream and collect (order across files is non-deterministic in Disk mode)
313+
let stream = cache.take_stream(0)?.unwrap();
314+
let mut collected: Vec<String> = stream.collect().await;
315+
collected.sort();
316+
assert_eq!(collected, vec!["a".to_string(), "b".to_string(), "c".to_string()]);
359317

360-
// After take, index is empty
361-
assert_eq!(cache.take(0)?, None);
318+
// After take_stream, index is empty
319+
assert!(cache.take_stream(0)?.is_none());
362320

363321
// Other index still has data
364-
assert_eq!(cache.take(1)?, Some(vec!["x".to_string()]));
365-
366-
Ok(())
367-
}
368-
369-
#[rstest]
370-
#[case::mem("mem")]
371-
#[case::disk("disk")]
372-
fn test_binary_values(#[case] case: &str) -> Result<()> {
373-
let cache_type = match case {
374-
"mem" => CacheType::InMemory,
375-
"disk" => CacheType::Disk(TempDir::new()?.path().to_path_buf()),
376-
_ => panic!("unknown case"),
377-
};
378-
let cache = TraversalCache::<Vec<u8>>::new(&cache_type)?;
379-
380-
cache.append(0, vec![vec![0, 1, 2], vec![255, 254]])?;
381-
cache.append(0, vec![vec![128]])?;
382-
383-
assert_eq!(cache.take(0)?, Some(vec![vec![0, 1, 2], vec![255, 254], vec![128]]));
384-
385-
Ok(())
386-
}
387-
388-
#[rstest]
389-
#[case::mem("mem")]
390-
#[case::disk("disk")]
391-
#[tokio::test]
392-
async fn test_append_stream(#[case] case: &str) -> Result<()> {
393-
let cache_type = match case {
394-
"mem" => CacheType::InMemory,
395-
"disk" => CacheType::Disk(TempDir::new()?.path().to_path_buf()),
396-
_ => panic!("unknown case"),
397-
};
398-
let cache = TraversalCache::<String>::new(&cache_type)?;
322+
let stream1 = cache.take_stream(1)?.unwrap();
323+
let collected1: Vec<String> = stream1.collect().await;
324+
assert_eq!(collected1, vec!["x".to_string()]);
399325

400-
// Append via stream
401-
let stream = futures::stream::iter(vec!["a".to_string(), "b".to_string()]);
402-
cache.append_stream(0, stream).await?;
403-
404-
// Append more via stream to same index
405-
let stream2 = futures::stream::iter(vec!["c".to_string()]);
406-
cache.append_stream(0, stream2).await?;
407-
408-
// Take preserves order across stream appends
409-
assert_eq!(
410-
cache.take(0)?,
411-
Some(vec!["a".to_string(), "b".to_string(), "c".to_string()])
412-
);
413-
414-
// After take, index is empty
415-
assert_eq!(cache.take(0)?, None);
326+
// Non-existent index returns None
327+
assert!(cache.take_stream(99)?.is_none());
416328

417329
Ok(())
418330
}
@@ -421,31 +333,25 @@ mod tests {
421333
#[case::mem("mem")]
422334
#[case::disk("disk")]
423335
#[tokio::test]
424-
async fn test_take_stream(#[case] case: &str) -> Result<()> {
336+
async fn test_binary_values_stream(#[case] case: &str) -> Result<()> {
425337
use futures::StreamExt;
426338

427339
let cache_type = match case {
428340
"mem" => CacheType::InMemory,
429341
"disk" => CacheType::Disk(TempDir::new()?.path().to_path_buf()),
430342
_ => panic!("unknown case"),
431343
};
432-
let cache = TraversalCache::<String>::new(&cache_type)?;
344+
let cache = TraversalCache::<Vec<u8>>::new(&cache_type)?;
433345

434-
// Append data
435-
cache.append(0, vec!["a".to_string(), "b".to_string()])?;
436-
cache.append(0, vec!["c".to_string()])?;
346+
cache
347+
.append_stream(0, futures::stream::iter(vec![vec![0, 1, 2], vec![255, 254]]))
348+
.await?;
349+
cache.append_stream(0, futures::stream::iter(vec![vec![128]])).await?;
437350

438-
// take_stream and collect (order across files is non-deterministic)
439351
let stream = cache.take_stream(0)?.unwrap();
440-
let mut collected: Vec<String> = stream.collect().await;
352+
let mut collected: Vec<Vec<u8>> = stream.collect().await;
441353
collected.sort();
442-
assert_eq!(collected, vec!["a".to_string(), "b".to_string(), "c".to_string()]);
443-
444-
// After take_stream, index is empty
445-
assert!(cache.take_stream(0)?.is_none());
446-
447-
// Non-existent index returns None
448-
assert!(cache.take_stream(99)?.is_none());
354+
assert_eq!(collected, vec![vec![0, 1, 2], vec![128], vec![255, 254]]);
449355

450356
Ok(())
451357
}

0 commit comments

Comments
 (0)