Skip to content

Commit 2bd36e5

Browse files
committed
impl skip PrimitiveValueDecoder
1 parent 367b999 commit 2bd36e5

9 files changed

Lines changed: 124 additions & 17 deletions

File tree

src/array_decoder/decimal.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,12 @@ struct DecimalScaleRepairDecoder {
112112
}
113113

114114
impl PrimitiveValueDecoder<i128> for DecimalScaleRepairDecoder {
115+
fn skip(&mut self, n: usize) -> Result<()> {
116+
self.varint_iter.skip(n)?;
117+
self.scale_iter.skip(n)?;
118+
Ok(())
119+
}
120+
115121
fn decode(&mut self, out: &mut [i128]) -> Result<()> {
116122
// TODO: can probably optimize, reuse buffers?
117123
let mut varint = vec![0; out.len()];

src/array_decoder/mod.rs

Lines changed: 46 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,12 @@ pub trait ArrayBatchDecoder: Send {
7474
batch_size: usize,
7575
parent_present: Option<&NullBuffer>,
7676
) -> Result<ArrayRef>;
77+
78+
/// Skip the next `n` values without decoding them, failing if it cannot skip the enough values.
79+
fn skip_values(&mut self, n: usize) -> Result<()> {
80+
// TODO: implement
81+
Ok(())
82+
}
7783
}
7884

7985
struct PrimitiveArrayDecoder<T: ArrowPrimitiveType> {
@@ -122,6 +128,24 @@ impl<T: ArrowPrimitiveType> ArrayBatchDecoder for PrimitiveArrayDecoder<T> {
122128
let array = Arc::new(array) as ArrayRef;
123129
Ok(array)
124130
}
131+
132+
fn skip_rows(&mut self, row_count: usize) -> Result<()> {
133+
// If we have a present stream, we need to decode it to know how many
134+
// non-null values to skip in the data stream
135+
let non_null_count = if let Some(ref mut present) = self.present {
136+
let mut present_buffer = vec![false; row_count];
137+
present.inner.decode(&mut present_buffer)?;
138+
// Count non-null values (where present is true)
139+
present_buffer.iter().filter(|&&v| v).count()
140+
} else {
141+
// No nulls, so all rows have values
142+
row_count
143+
};
144+
145+
// Skip the data stream for non-null values only
146+
self.iter.skip(non_null_count)?;
147+
Ok(())
148+
}
125149
}
126150

127151
type Int64ArrayDecoder = PrimitiveArrayDecoder<Int64Type>;
@@ -167,6 +191,24 @@ impl ArrayBatchDecoder for BooleanArrayDecoder {
167191
};
168192
Ok(Arc::new(array))
169193
}
194+
195+
fn skip_rows(&mut self, row_count: usize) -> Result<()> {
196+
// If we have a present stream, we need to decode it to know how many
197+
// non-null values to skip in the data stream
198+
let non_null_count = if let Some(ref mut present) = self.present {
199+
let mut present_buffer = vec![false; row_count];
200+
present.inner.decode(&mut present_buffer)?;
201+
// Count non-null values (where present is true)
202+
present_buffer.iter().filter(|&&v| v).count()
203+
} else {
204+
// No nulls, so all rows have values
205+
row_count
206+
};
207+
208+
// Skip the data stream for non-null values only
209+
self.iter.skip(non_null_count)?;
210+
Ok(())
211+
}
170212
}
171213

172214
struct PresentDecoder {
@@ -514,14 +556,11 @@ impl NaiveStripeDecoder {
514556
})
515557
}
516558

517-
/// Skip the specified number of rows by decoding and discarding them
559+
/// Skip the specified number of rows by calling skip_rows on each decoder
518560
fn skip_rows(&mut self, count: usize) -> Result<()> {
519-
// Decode in batches to avoid large memory allocations
520-
let mut remaining = count;
521-
while remaining > 0 {
522-
let chunk = self.batch_size.min(remaining);
523-
let _ = self.inner_decode_next_batch(chunk)?;
524-
remaining -= chunk;
561+
// Call skip_rows on each decoder to efficiently skip rows
562+
for decoder in &mut self.decoders {
563+
decoder.skip_rows(count)?;
525564
}
526565
Ok(())
527566
}

src/array_decoder/timestamp.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,11 @@ impl TimestampNanosecondAsDecimalWithTzDecoder {
308308
}
309309

310310
impl PrimitiveValueDecoder<i128> for TimestampNanosecondAsDecimalWithTzDecoder {
311+
fn skip(&mut self, n: usize) -> Result<()> {
312+
self.0.skip(n)?;
313+
Ok(())
314+
}
315+
311316
fn decode(&mut self, out: &mut [i128]) -> Result<()> {
312317
self.0.decode(out)?;
313318
for x in out.iter_mut() {

src/encoding/boolean.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,11 @@ impl<R: Read> BooleanDecoder<R> {
5555
}
5656

5757
impl<R: Read> PrimitiveValueDecoder<bool> for BooleanDecoder<R> {
58+
fn skip(&mut self, n: usize) -> Result<()> {
59+
self.decoder.skip(n)?;
60+
Ok(())
61+
}
62+
5863
// TODO: can probably implement this better
5964
fn decode(&mut self, out: &mut [bool]) -> Result<()> {
6065
for x in out.iter_mut() {

src/encoding/decimal.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,13 @@ impl<R: Read> UnboundedVarintStreamDecoder<R> {
3636
}
3737

3838
impl<R: Read> PrimitiveValueDecoder<i128> for UnboundedVarintStreamDecoder<R> {
39+
fn skip(&mut self, n: usize) -> Result<()> {
40+
for _ in 0..n {
41+
read_varint_zigzagged::<i128, _, SignedEncoding>(&mut self.reader)?;
42+
}
43+
Ok(())
44+
}
45+
3946
fn decode(&mut self, out: &mut [i128]) -> Result<()> {
4047
for x in out.iter_mut() {
4148
*x = read_varint_zigzagged::<i128, _, SignedEncoding>(&mut self.reader)?;

src/encoding/float.rs

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use bytes::{Bytes, BytesMut};
2222
use snafu::ResultExt;
2323

2424
use crate::{
25-
error::{IoSnafu, Result},
25+
error::{IoSnafu, OutOfSpecSnafu, Result},
2626
memory::EstimateMemory,
2727
};
2828

@@ -36,12 +36,12 @@ pub trait Float:
3636
impl Float for f32 {}
3737
impl Float for f64 {}
3838

39-
pub struct FloatDecoder<F: Float, R: std::io::Read> {
39+
pub struct FloatDecoder<F: Float, R: std::io::Read + std::io::Seek> {
4040
reader: R,
4141
phantom: std::marker::PhantomData<F>,
4242
}
4343

44-
impl<F: Float, R: std::io::Read> FloatDecoder<F, R> {
44+
impl<F: Float, R: std::io::Read + std::io::Seek> FloatDecoder<F, R> {
4545
pub fn new(reader: R) -> Self {
4646
Self {
4747
reader,
@@ -50,7 +50,28 @@ impl<F: Float, R: std::io::Read> FloatDecoder<F, R> {
5050
}
5151
}
5252

53-
impl<F: Float, R: std::io::Read> PrimitiveValueDecoder<F> for FloatDecoder<F, R> {
53+
impl<F: Float, R: std::io::Read + std::io::Seek> PrimitiveValueDecoder<F> for FloatDecoder<F, R> {
54+
fn skip(&mut self, n: usize) -> Result<()> {
55+
let bytes_to_skip = n * std::mem::size_of::<F>();
56+
let cur_pos = self.reader.stream_position().context(IoSnafu)?;
57+
let seek_pos = self
58+
.reader
59+
.seek(std::io::SeekFrom::Current(bytes_to_skip as i64))
60+
.context(IoSnafu)?;
61+
let actual_skip = seek_pos - cur_pos;
62+
if actual_skip != bytes_to_skip as u64 {
63+
return OutOfSpecSnafu {
64+
msg: format!(
65+
"failed to skip float values, expected to skip {} bytes, but actually skipped {}",
66+
bytes_to_skip, actual_skip
67+
),
68+
}
69+
.fail();
70+
}
71+
72+
Ok(())
73+
}
74+
5475
fn decode(&mut self, out: &mut [F]) -> Result<()> {
5576
let bytes = must_cast_slice_mut::<F, u8>(out);
5677
self.reader.read_exact(bytes).context(IoSnafu)?;

src/encoding/mod.rs

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ pub trait PrimitiveValueEncoder<V: Copy>: EstimateMemory {
5151
}
5252

5353
pub trait PrimitiveValueDecoder<V> {
54-
/// Skip the next `n` values without decoding them. Failing if it cannot skip the enough values.
54+
/// Skip the next `n` values without decoding them, failing if it cannot skip the enough values.
5555
fn skip(&mut self, n: usize) -> Result<()>;
5656

5757
/// Decode out.len() values into out at a time, failing if it cannot fill
@@ -98,14 +98,25 @@ mod tests {
9898
use super::*;
9999

100100
/// Emits numbers increasing from 0.
101-
struct DummyDecoder;
101+
struct DummyDecoder {
102+
value: i32,
103+
}
104+
105+
impl DummyDecoder {
106+
fn new() -> Self {
107+
Self { value: 0 }
108+
}
109+
}
102110

103111
impl PrimitiveValueDecoder<i32> for DummyDecoder {
104-
fn skip(&mut self, n: usize) -> Result<()> {
112+
fn skip(&mut self, _n: usize) -> Result<()> {
113+
self.value += 1;
105114
Ok(())
106115
}
107116
fn decode(&mut self, out: &mut [i32]) -> Result<()> {
108-
let values = (0..out.len()).map(|x| x as i32).collect::<Vec<_>>();
117+
let values = (0..out.len())
118+
.map(|x| self.value + x as i32)
119+
.collect::<Vec<_>>();
109120
out.copy_from_slice(&values);
110121
Ok(())
111122
}
@@ -128,7 +139,7 @@ mod tests {
128139
proptest! {
129140
#[test]
130141
fn decode_spaced_proptest(present: Vec<bool>) {
131-
let mut decoder = DummyDecoder;
142+
let mut decoder = DummyDecoder::new();
132143
let mut out = vec![-1; present.len()];
133144
decoder.decode_spaced(&mut out, &NullBuffer::from(present.clone())).unwrap();
134145
let expected = gen_spaced_dummy_decoder_expected(&present);
@@ -138,7 +149,7 @@ mod tests {
138149

139150
#[test]
140151
fn decode_spaced_edge_cases() {
141-
let mut decoder = DummyDecoder;
152+
let mut decoder = DummyDecoder::new();
142153
let len = 10;
143154

144155
// all present

src/encoding/rle.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ pub trait GenericRle<V: Copy> {
6161

6262
impl<V: Copy, G: GenericRle<V> + sealed::Rle> PrimitiveValueDecoder<V> for G {
6363
fn skip(&mut self, n: usize) -> Result<()> {
64+
// Delegate to the GenericRle implementation
6465
self.skip_values(n)
6566
}
6667

src/encoding/timestamp.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,12 @@ impl<T: ArrowTimestampType> TimestampDecoder<T> {
5050
}
5151

5252
impl<T: ArrowTimestampType> PrimitiveValueDecoder<T::Native> for TimestampDecoder<T> {
53+
fn skip(&mut self, n: usize) -> Result<()> {
54+
self.data.skip(n)?;
55+
self.secondary.skip(n)?;
56+
Ok(())
57+
}
58+
5359
fn decode(&mut self, out: &mut [T::Native]) -> Result<()> {
5460
// TODO: can probably optimize, reuse buffers?
5561
let mut data = vec![0; out.len()];
@@ -90,6 +96,12 @@ impl TimestampNanosecondAsDecimalDecoder {
9096
}
9197

9298
impl PrimitiveValueDecoder<i128> for TimestampNanosecondAsDecimalDecoder {
99+
fn skip(&mut self, n: usize) -> Result<()> {
100+
self.data.skip(n)?;
101+
self.secondary.skip(n)?;
102+
Ok(())
103+
}
104+
93105
fn decode(&mut self, out: &mut [i128]) -> Result<()> {
94106
// TODO: can probably optimize, reuse buffers?
95107
let mut data = vec![0; out.len()];

0 commit comments

Comments
 (0)