Skip to content

Commit ae72821

Browse files
jeffhuenclaude
andauthored
Optimize memory management in streaming parsers and encoding (#2)
* Fix memory leaks in streaming parsers and reduce thread pool overhead Streaming parsers (StreamingParser, GeneralStreamingParser, GeneralStreamingParserNewlines) had three memory leak patterns: 1. compact_buffer() used Vec::drain() which preserves peak allocation capacity even after removing most data. Added shrink_excess() to reclaim memory when capacity exceeds 4x length. 2. take_rows() used drain().collect() leaving complete_rows at peak capacity. Now shrinks after draining. 3. finalize() left the internal buffer allocated after extracting the final rows. Now releases buffer memory since parsing is complete. Also: - Reduce rayon thread pool stack from 8 MiB to 2 MiB per thread (saves ~48 MiB virtual memory across 8 persistent threads) - Remove unnecessary field.clone() in parallel encoder's encoding path - Add ExactSizeIterator impls for RowIter, RowFieldIter, FieldIter All 95 tests pass. https://claude.ai/code/session_01QdJE1Gks1uipLWVupAwrbe * Apply Rust best practices: thiserror, #[must_use], fmt/clippy clean - Add thiserror for BufferOverflow: implements Display + Error traits as required for idiomatic Rust library error types - Add #[must_use] to key types: StructuralIndex, RowEnd, Newlines, BufferOverflow, StreamingParser, GeneralStreamingParser, GeneralStreamingParserNewlines, GeneralFieldBound, StreamingParserResource - Add #[must_use] to getter methods: available_rows(), has_partial(), buffer_size(), row_count(), max_pattern_len() - Fix import ordering in general.rs to pass cargo fmt - All quality gates pass: cargo fmt, clippy -D warnings, 95 tests https://claude.ai/code/session_01QdJE1Gks1uipLWVupAwrbe * Fix ExactSizeIterator bugs, shrink_excess threshold, add tests Review fixes for PR #2: - Fix RowIter::next(): increment row_idx in trailing-row branch so ExactSizeIterator::len() returns 0 after exhaustion (was returning 1) - Fix RowFieldIter::next(): same trailing-row row_idx fix - Fix FieldIter::size_hint(): check done flag so len() returns 0 after last field is consumed (was returning 1) - Fix shrink_excess(): use byte-based 1 KiB floor via size_of::<T>() instead of element-count 1024 (doc said bytes, code used elements) - Add ExactSizeIterator tests for RowIter, RowFieldIter, FieldIter covering trailing rows and multi-field exhaustion - Add shrink_excess tests: threshold, floor, ratio, large-element types - Add finalize/reset memory release tests --------- Co-authored-by: Claude <noreply@anthropic.com>
1 parent ac418cb commit ae72821

File tree

9 files changed

+267
-22
lines changed

9 files changed

+267
-22
lines changed

native/rustycsv/Cargo.lock

Lines changed: 22 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

native/rustycsv/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ crate-type = ["cdylib", "lib"]
1212
[dependencies]
1313
rustler = { git = "https://github.com/rusterlium/rustler.git", branch = "master" }
1414
rayon = "1.10" # Data parallelism for parallel parsing
15+
thiserror = "2" # Idiomatic error types for library code
1516
mimalloc = { version = "0.1", default-features = false, optional = true }
1617

1718
[target.'cfg(target_env = "musl")'.dependencies]

native/rustycsv/src/core/newlines.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
/// checks against the custom patterns.
66
77
#[derive(Debug, Clone)]
8+
#[must_use]
89
pub struct Newlines {
910
/// Newline patterns sorted longest-first for greedy matching.
1011
pub patterns: Vec<Vec<u8>>,
@@ -32,6 +33,7 @@ impl Newlines {
3233
}
3334

3435
/// Maximum pattern length (used for chunk-boundary safety in streaming).
36+
#[must_use]
3537
pub fn max_pattern_len(&self) -> usize {
3638
self.patterns.iter().map(|p| p.len()).max().unwrap_or(1)
3739
}

native/rustycsv/src/core/simd_index.rs

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
/// A newline terminator position.
77
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
8+
#[must_use]
89
pub struct RowEnd {
910
/// Byte position of terminator start (\n or \r in \r\n).
1011
pub pos: u32,
@@ -14,6 +15,7 @@ pub struct RowEnd {
1415

1516
/// Structural index: positions of all unquoted separators and row endings.
1617
#[derive(Debug)]
18+
#[must_use]
1719
pub struct StructuralIndex {
1820
/// Positions of unquoted field separators (commas, tabs, etc.).
1921
pub field_seps: Vec<u32>,
@@ -58,6 +60,7 @@ impl StructuralIndex {
5860

5961
/// Number of rows.
6062
#[inline]
63+
#[must_use]
6164
pub fn row_count(&self) -> usize {
6265
let n = self.row_ends.len();
6366
// If there's content after the last row_end (no trailing newline), there's one more row.
@@ -130,6 +133,7 @@ impl<'a> Iterator for RowIter<'a> {
130133
let start = self.pos;
131134
let end = self.index.input_len;
132135
self.pos = end;
136+
self.row_idx += 1;
133137
Some((start, end, end))
134138
} else {
135139
None
@@ -144,6 +148,8 @@ impl<'a> Iterator for RowIter<'a> {
144148
}
145149
}
146150

151+
impl ExactSizeIterator for RowIter<'_> {}
152+
147153
/// A single row from the cursor-based iterator, with its field bounds.
148154
pub struct Row<'a> {
149155
pub start: u32,
@@ -177,6 +183,7 @@ impl<'a> Iterator for RowFieldIter<'a> {
177183
let start = self.pos;
178184
let end = self.index.input_len;
179185
self.pos = end;
186+
self.row_idx += 1;
180187
(start, end)
181188
} else {
182189
return None;
@@ -210,6 +217,8 @@ impl<'a> Iterator for RowFieldIter<'a> {
210217
}
211218
}
212219

220+
impl ExactSizeIterator for RowFieldIter<'_> {}
221+
213222
/// Iterator over fields in a single row.
214223
pub struct FieldIter<'a> {
215224
seps: &'a [u32],
@@ -249,11 +258,16 @@ impl<'a> Iterator for FieldIter<'a> {
249258

250259
#[inline]
251260
fn size_hint(&self) -> (usize, Option<usize>) {
261+
if self.done {
262+
return (0, Some(0));
263+
}
252264
let remaining = (self.seps.len() + 1).saturating_sub(self.idx);
253265
(remaining, Some(remaining))
254266
}
255267
}
256268

269+
impl ExactSizeIterator for FieldIter<'_> {}
270+
257271
#[cfg(test)]
258272
mod tests {
259273
use super::*;
@@ -373,4 +387,74 @@ mod tests {
373387
assert_eq!(cursor[0], vec![(0, 1), (2, 3)]); // a,b
374388
assert_eq!(cursor[1], vec![(4, 5)]); // c
375389
}
390+
391+
// --- ExactSizeIterator correctness tests ---
392+
393+
#[test]
394+
fn row_iter_exact_size_with_trailing_row() {
395+
// "a\nb" — 2 rows, second has no trailing newline
396+
let idx = make_index(vec![], vec![RowEnd { pos: 1, len: 1 }], 3);
397+
let mut iter = idx.rows();
398+
399+
assert_eq!(iter.len(), 2);
400+
let _ = iter.next(); // consume row 1
401+
assert_eq!(iter.len(), 1);
402+
let _ = iter.next(); // consume trailing row
403+
assert_eq!(iter.len(), 0);
404+
assert!(iter.next().is_none());
405+
}
406+
407+
#[test]
408+
fn row_iter_exact_size_no_trailing_row() {
409+
// "a\n" — 1 row with trailing newline
410+
let idx = make_index(vec![], vec![RowEnd { pos: 1, len: 1 }], 2);
411+
let mut iter = idx.rows();
412+
413+
assert_eq!(iter.len(), 1);
414+
let _ = iter.next();
415+
assert_eq!(iter.len(), 0);
416+
assert!(iter.next().is_none());
417+
}
418+
419+
#[test]
420+
fn row_field_iter_exact_size_with_trailing_row() {
421+
// "a\nb" — 2 rows, second has no trailing newline
422+
let idx = make_index(vec![], vec![RowEnd { pos: 1, len: 1 }], 3);
423+
let mut iter = idx.rows_with_fields();
424+
425+
assert_eq!(iter.len(), 2);
426+
let _ = iter.next();
427+
assert_eq!(iter.len(), 1);
428+
let _ = iter.next(); // trailing row
429+
assert_eq!(iter.len(), 0);
430+
assert!(iter.next().is_none());
431+
}
432+
433+
#[test]
434+
fn field_iter_exact_size_single_field() {
435+
// Row "abc" — 1 field, no separators
436+
let idx = make_index(vec![], vec![RowEnd { pos: 3, len: 1 }], 4);
437+
let mut fields = idx.fields_in_row(0, 3);
438+
439+
assert_eq!(fields.len(), 1);
440+
let _ = fields.next(); // consume the only field
441+
assert_eq!(fields.len(), 0);
442+
assert!(fields.next().is_none());
443+
}
444+
445+
#[test]
446+
fn field_iter_exact_size_multiple_fields() {
447+
// Row "a,b,c" — 3 fields, seps at 1 and 3
448+
let idx = make_index(vec![1, 3], vec![RowEnd { pos: 5, len: 1 }], 6);
449+
let mut fields = idx.fields_in_row(0, 5);
450+
451+
assert_eq!(fields.len(), 3);
452+
let _ = fields.next(); // field "a"
453+
assert_eq!(fields.len(), 2);
454+
let _ = fields.next(); // field "b"
455+
assert_eq!(fields.len(), 1);
456+
let _ = fields.next(); // field "c" (last, sets done=true)
457+
assert_eq!(fields.len(), 0);
458+
assert!(fields.next().is_none());
459+
}
376460
}

native/rustycsv/src/lib.rs

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1575,23 +1575,21 @@ fn encode_string_parallel<'a>(
15751575
continue;
15761576
}
15771577

1578-
let utf8_field: Vec<u8> = if needs_quoting {
1578+
if needs_quoting {
15791579
let mut buf = Vec::with_capacity(field.len() + 8);
15801580
write_quoted_field(&mut buf, field, esc);
1581-
buf
1581+
if needs_encoding {
1582+
let encoded = encode_utf8_to_target(&buf, encoding);
1583+
out.extend_from_slice(&encoded);
1584+
} else {
1585+
out.extend_from_slice(&buf);
1586+
}
15821587
} else if needs_encoding {
1583-
field.clone()
1584-
} else {
1585-
// No formula, no quoting, no encoding — direct extend
1586-
out.extend_from_slice(field);
1587-
continue;
1588-
};
1589-
1590-
if needs_encoding {
1591-
let encoded = encode_utf8_to_target(&utf8_field, encoding);
1588+
// Encode directly — no clone needed
1589+
let encoded = encode_utf8_to_target(field, encoding);
15921590
out.extend_from_slice(&encoded);
15931591
} else {
1594-
out.extend_from_slice(&utf8_field);
1592+
out.extend_from_slice(field);
15951593
}
15961594
}
15971595
out.extend_from_slice(&ls_encoded);

native/rustycsv/src/resource.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ impl StreamingParserEnum {
7474
}
7575

7676
/// Wrapper for StreamingParser that can be stored in a ResourceArc
77+
#[must_use]
7778
pub struct StreamingParserResource {
7879
pub inner: Mutex<StreamingParserEnum>,
7980
}

native/rustycsv/src/strategy/general.rs

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
use std::borrow::Cow;
1111

12+
use super::streaming::shrink_excess;
1213
use crate::core::newlines::{match_newline, Newlines};
1314

1415
// ============================================================================
@@ -187,6 +188,7 @@ fn parse_row_general<'a>(
187188

188189
/// Field boundary for general parsing
189190
#[derive(Debug, Clone, Copy)]
191+
#[must_use]
190192
pub struct GeneralFieldBound {
191193
pub start: usize,
192194
pub end: usize,
@@ -546,6 +548,7 @@ fn parse_row_boundaries_general(
546548
// ============================================================================
547549

548550
/// Streaming parser that handles multi-byte separators and escapes
551+
#[must_use]
549552
pub struct GeneralStreamingParser {
550553
buffer: Vec<u8>,
551554
complete_rows: Vec<Vec<Vec<u8>>>,
@@ -658,22 +661,28 @@ impl GeneralStreamingParser {
658661
self.buffer.drain(0..self.partial_row_start);
659662
self.scan_pos -= self.partial_row_start;
660663
self.partial_row_start = 0;
664+
shrink_excess(&mut self.buffer);
661665
}
662666
}
663667

664668
pub fn take_rows(&mut self, max: usize) -> Vec<Vec<Vec<u8>>> {
665669
let take_count = max.min(self.complete_rows.len());
666-
self.complete_rows.drain(0..take_count).collect()
670+
let rows: Vec<_> = self.complete_rows.drain(0..take_count).collect();
671+
shrink_excess(&mut self.complete_rows);
672+
rows
667673
}
668674

675+
#[must_use]
669676
pub fn available_rows(&self) -> usize {
670677
self.complete_rows.len()
671678
}
672679

680+
#[must_use]
673681
pub fn has_partial(&self) -> bool {
674682
self.partial_row_start < self.buffer.len()
675683
}
676684

685+
#[must_use]
677686
pub fn buffer_size(&self) -> usize {
678687
self.buffer.len()
679688
}
@@ -684,8 +693,11 @@ impl GeneralStreamingParser {
684693
if !row.is_empty() {
685694
self.complete_rows.push(row);
686695
}
687-
self.partial_row_start = self.buffer.len();
688696
}
697+
// Release the buffer — parsing is done
698+
self.buffer = Vec::new();
699+
self.partial_row_start = 0;
700+
self.scan_pos = 0;
689701
std::mem::take(&mut self.complete_rows)
690702
}
691703
}
@@ -1199,6 +1211,7 @@ fn parse_row_boundaries_general_with_newlines(
11991211
}
12001212

12011213
/// Streaming parser with custom newline support.
1214+
#[must_use]
12021215
pub struct GeneralStreamingParserNewlines {
12031216
buffer: Vec<u8>,
12041217
complete_rows: Vec<Vec<Vec<u8>>>,
@@ -1317,22 +1330,28 @@ impl GeneralStreamingParserNewlines {
13171330
self.buffer.drain(0..self.partial_row_start);
13181331
self.scan_pos -= self.partial_row_start;
13191332
self.partial_row_start = 0;
1333+
shrink_excess(&mut self.buffer);
13201334
}
13211335
}
13221336

13231337
pub fn take_rows(&mut self, max: usize) -> Vec<Vec<Vec<u8>>> {
13241338
let take_count = max.min(self.complete_rows.len());
1325-
self.complete_rows.drain(0..take_count).collect()
1339+
let rows: Vec<_> = self.complete_rows.drain(0..take_count).collect();
1340+
shrink_excess(&mut self.complete_rows);
1341+
rows
13261342
}
13271343

1344+
#[must_use]
13281345
pub fn available_rows(&self) -> usize {
13291346
self.complete_rows.len()
13301347
}
13311348

1349+
#[must_use]
13321350
pub fn has_partial(&self) -> bool {
13331351
self.partial_row_start < self.buffer.len()
13341352
}
13351353

1354+
#[must_use]
13361355
pub fn buffer_size(&self) -> usize {
13371356
self.buffer.len()
13381357
}
@@ -1343,8 +1362,11 @@ impl GeneralStreamingParserNewlines {
13431362
if !row.is_empty() {
13441363
self.complete_rows.push(row);
13451364
}
1346-
self.partial_row_start = self.buffer.len();
13471365
}
1366+
// Release the buffer — parsing is done
1367+
self.buffer = Vec::new();
1368+
self.partial_row_start = 0;
1369+
self.scan_pos = 0;
13481370
std::mem::take(&mut self.complete_rows)
13491371
}
13501372
}

native/rustycsv/src/strategy/parallel.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ pub(crate) fn get_pool() -> Option<&'static rayon::ThreadPool> {
4242
rayon::ThreadPoolBuilder::new()
4343
.num_threads(recommended_threads())
4444
.thread_name(|i| format!("rustycsv-{i}"))
45+
// Reduce per-thread stack from the 8 MiB default to 2 MiB.
46+
// CSV field extraction has shallow call stacks; the default
47+
// wastes ~48 MiB of virtual memory across 8 persistent threads.
48+
.stack_size(2 * 1024 * 1024)
4549
.build()
4650
.ok()
4751
})

0 commit comments

Comments
 (0)