Skip to content

Commit d8595b6

Browse files
authored
Various refactoring (#39)
* Refactor retrieval of int decoders Getting unsigned/signed integer decoder shouldn't need knowledge of a column; only cares about if you need V1 or V2, so refactor to accomodate this. Also simplifies some methods by removing the check for invalid column encoding as this shouldn't be checked at decoder retrieval time anyway. * Remove read_stream methods from Column These are just indirections and serve no purpose as an abstraction, as they don't actually rely on fields from Column. * Minor refactoring * Minor refactoring * Minor refactoring around stripe/column * Allow RootDataType to lookup all transitive column indices
1 parent 8adf0f8 commit d8595b6

14 files changed

Lines changed: 194 additions & 199 deletions

File tree

src/array_decoder/decimal.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use arrow::datatypes::Decimal128Type;
2424
use snafu::ResultExt;
2525

2626
use crate::encoding::decimal::UnboundedVarintStreamDecoder;
27-
use crate::encoding::integer::get_rle_reader;
27+
use crate::encoding::integer::get_signed_int_decoder;
2828
use crate::encoding::PrimitiveValueDecoder;
2929
use crate::error::ArrowSnafu;
3030
use crate::proto::stream::Kind;
@@ -38,13 +38,13 @@ pub fn new_decimal_decoder(
3838
stripe: &Stripe,
3939
precision: u32,
4040
fixed_scale: u32,
41-
) -> Result<Box<dyn ArrayBatchDecoder>> {
41+
) -> Box<dyn ArrayBatchDecoder> {
4242
let varint_iter = stripe.stream_map().get(column, Kind::Data);
4343
let varint_iter = Box::new(UnboundedVarintStreamDecoder::new(varint_iter));
4444

4545
// Scale is specified on a per varint basis (in addition to being encoded in the type)
4646
let scale_iter = stripe.stream_map().get(column, Kind::Secondary);
47-
let scale_iter = get_rle_reader::<i32, _>(column, scale_iter)?;
47+
let scale_iter = get_signed_int_decoder::<i32>(scale_iter, column.rle_version());
4848

4949
let present = PresentDecoder::from_stripe(stripe, column);
5050

@@ -55,12 +55,12 @@ pub fn new_decimal_decoder(
5555
};
5656
let iter = Box::new(iter);
5757

58-
Ok(Box::new(DecimalArrayDecoder::new(
58+
Box::new(DecimalArrayDecoder::new(
5959
precision as u8,
6060
fixed_scale as i8,
6161
iter,
6262
present,
63-
)))
63+
))
6464
}
6565

6666
/// Wrapper around PrimitiveArrayDecoder to allow specifying the precision and scale

src/array_decoder/list.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use snafu::ResultExt;
2424

2525
use crate::array_decoder::derive_present_vec;
2626
use crate::column::Column;
27-
use crate::encoding::integer::get_unsigned_rle_reader;
27+
use crate::encoding::integer::get_unsigned_int_decoder;
2828
use crate::encoding::PrimitiveValueDecoder;
2929
use crate::proto::stream::Kind;
3030

@@ -45,10 +45,10 @@ impl ListArrayDecoder {
4545
let present = PresentDecoder::from_stripe(stripe, column);
4646

4747
let child = &column.children()[0];
48-
let inner = array_decoder_factory(child, field.clone(), stripe)?;
48+
let inner = array_decoder_factory(child, field.data_type(), stripe)?;
4949

5050
let reader = stripe.stream_map().get(column, Kind::Length);
51-
let lengths = get_unsigned_rle_reader(column, reader);
51+
let lengths = get_unsigned_int_decoder(reader, column.rle_version());
5252

5353
Ok(Self {
5454
inner,

src/array_decoder/map.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use snafu::ResultExt;
2424

2525
use crate::array_decoder::derive_present_vec;
2626
use crate::column::Column;
27-
use crate::encoding::integer::get_unsigned_rle_reader;
27+
use crate::encoding::integer::get_unsigned_int_decoder;
2828
use crate::encoding::PrimitiveValueDecoder;
2929
use crate::error::{ArrowSnafu, Result};
3030
use crate::proto::stream::Kind;
@@ -50,13 +50,13 @@ impl MapArrayDecoder {
5050
let present = PresentDecoder::from_stripe(stripe, column);
5151

5252
let keys_column = &column.children()[0];
53-
let keys = array_decoder_factory(keys_column, keys_field.clone(), stripe)?;
53+
let keys = array_decoder_factory(keys_column, keys_field.data_type(), stripe)?;
5454

5555
let values_column = &column.children()[1];
56-
let values = array_decoder_factory(values_column, values_field.clone(), stripe)?;
56+
let values = array_decoder_factory(values_column, values_field.data_type(), stripe)?;
5757

5858
let reader = stripe.stream_map().get(column, Kind::Length);
59-
let lengths = get_unsigned_rle_reader(column, reader);
59+
let lengths = get_unsigned_int_decoder(reader, column.rle_version());
6060

6161
let fields = Fields::from(vec![keys_field, values_field]);
6262

src/array_decoder/mod.rs

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use arrow::array::{ArrayRef, BooleanArray, BooleanBufferBuilder, PrimitiveArray}
2121
use arrow::buffer::NullBuffer;
2222
use arrow::datatypes::ArrowNativeTypeOp;
2323
use arrow::datatypes::ArrowPrimitiveType;
24-
use arrow::datatypes::{DataType as ArrowDataType, Field};
24+
use arrow::datatypes::DataType as ArrowDataType;
2525
use arrow::datatypes::{
2626
Date32Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, SchemaRef,
2727
};
@@ -32,7 +32,7 @@ use crate::column::Column;
3232
use crate::encoding::boolean::BooleanDecoder;
3333
use crate::encoding::byte::ByteRleDecoder;
3434
use crate::encoding::float::FloatDecoder;
35-
use crate::encoding::integer::get_rle_reader;
35+
use crate::encoding::integer::get_signed_int_decoder;
3636
use crate::encoding::PrimitiveValueDecoder;
3737
use crate::error::{
3838
self, MismatchedSchemaSnafu, Result, UnexpectedSnafu, UnsupportedTypeVariantSnafu,
@@ -258,10 +258,10 @@ impl Iterator for NaiveStripeDecoder {
258258

259259
pub fn array_decoder_factory(
260260
column: &Column,
261-
field: Arc<Field>,
261+
hinted_arrow_type: &ArrowDataType,
262262
stripe: &Stripe,
263263
) -> Result<Box<dyn ArrayBatchDecoder>> {
264-
let decoder: Box<dyn ArrayBatchDecoder> = match (column.data_type(), field.data_type()) {
264+
let decoder: Box<dyn ArrayBatchDecoder> = match (column.data_type(), hinted_arrow_type) {
265265
// TODO: try make branches more generic, reduce duplication
266266
(DataType::Boolean { .. }, ArrowDataType::Boolean) => {
267267
let iter = stripe.stream_map().get(column, Kind::Data);
@@ -277,19 +277,19 @@ pub fn array_decoder_factory(
277277
}
278278
(DataType::Short { .. }, ArrowDataType::Int16) => {
279279
let iter = stripe.stream_map().get(column, Kind::Data);
280-
let iter = get_rle_reader(column, iter)?;
280+
let iter = get_signed_int_decoder(iter, column.rle_version());
281281
let present = PresentDecoder::from_stripe(stripe, column);
282282
Box::new(Int16ArrayDecoder::new(iter, present))
283283
}
284284
(DataType::Int { .. }, ArrowDataType::Int32) => {
285285
let iter = stripe.stream_map().get(column, Kind::Data);
286-
let iter = get_rle_reader(column, iter)?;
286+
let iter = get_signed_int_decoder(iter, column.rle_version());
287287
let present = PresentDecoder::from_stripe(stripe, column);
288288
Box::new(Int32ArrayDecoder::new(iter, present))
289289
}
290290
(DataType::Long { .. }, ArrowDataType::Int64) => {
291291
let iter = stripe.stream_map().get(column, Kind::Data);
292-
let iter = get_rle_reader(column, iter)?;
292+
let iter = get_signed_int_decoder(iter, column.rle_version());
293293
let present = PresentDecoder::from_stripe(stripe, column);
294294
Box::new(Int64ArrayDecoder::new(iter, present))
295295
}
@@ -315,7 +315,7 @@ pub fn array_decoder_factory(
315315
},
316316
ArrowDataType::Decimal128(a_precision, a_scale),
317317
) if *precision as u8 == *a_precision && *scale as i8 == *a_scale => {
318-
new_decimal_decoder(column, stripe, *precision, *scale)?
318+
new_decimal_decoder(column, stripe, *precision, *scale)
319319
}
320320
(DataType::Timestamp { .. }, field_type) => {
321321
new_timestamp_decoder(column, field_type.clone(), stripe)?
@@ -326,7 +326,7 @@ pub fn array_decoder_factory(
326326
(DataType::Date { .. }, ArrowDataType::Date32) => {
327327
// TODO: allow Date64
328328
let iter = stripe.stream_map().get(column, Kind::Data);
329-
let iter = get_rle_reader(column, iter)?;
329+
let iter = get_signed_int_decoder(iter, column.rle_version());
330330
let present = PresentDecoder::from_stripe(stripe, column);
331331
Box::new(DateArrayDecoder::new(iter, present))
332332
}
@@ -433,17 +433,13 @@ impl NaiveStripeDecoder {
433433
}
434434

435435
pub fn new(stripe: Stripe, schema_ref: SchemaRef, batch_size: usize) -> Result<Self> {
436-
let mut decoders = Vec::with_capacity(stripe.columns().len());
437436
let number_of_rows = stripe.number_of_rows();
438-
439-
for (col, field) in stripe
437+
let decoders = stripe
440438
.columns()
441439
.iter()
442-
.zip(schema_ref.fields.iter().cloned())
443-
{
444-
let decoder = array_decoder_factory(col, field, &stripe)?;
445-
decoders.push(decoder);
446-
}
440+
.zip(schema_ref.fields.iter())
441+
.map(|(col, field)| array_decoder_factory(col, field.data_type(), &stripe))
442+
.collect::<Result<Vec<_>>>()?;
447443

448444
Ok(Self {
449445
stripe,

src/array_decoder/string.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use snafu::ResultExt;
2828
use crate::array_decoder::derive_present_vec;
2929
use crate::column::Column;
3030
use crate::compression::Decompressor;
31-
use crate::encoding::integer::get_unsigned_rle_reader;
31+
use crate::encoding::integer::get_unsigned_int_decoder;
3232
use crate::encoding::PrimitiveValueDecoder;
3333
use crate::error::{ArrowSnafu, IoSnafu, Result};
3434
use crate::proto::column_encoding::Kind as ColumnEncodingKind;
@@ -42,7 +42,7 @@ pub fn new_binary_decoder(column: &Column, stripe: &Stripe) -> Result<Box<dyn Ar
4242
let present = PresentDecoder::from_stripe(stripe, column);
4343

4444
let lengths = stripe.stream_map().get(column, Kind::Length);
45-
let lengths = get_unsigned_rle_reader(column, lengths);
45+
let lengths = get_unsigned_int_decoder(lengths, column.rle_version());
4646

4747
let bytes = Box::new(stripe.stream_map().get(column, Kind::Data));
4848
Ok(Box::new(BinaryArrayDecoder::new(bytes, lengths, present)))
@@ -53,7 +53,7 @@ pub fn new_string_decoder(column: &Column, stripe: &Stripe) -> Result<Box<dyn Ar
5353
let present = PresentDecoder::from_stripe(stripe, column);
5454

5555
let lengths = stripe.stream_map().get(column, Kind::Length);
56-
let lengths = get_unsigned_rle_reader(column, lengths);
56+
let lengths = get_unsigned_int_decoder(lengths, column.rle_version());
5757

5858
match kind {
5959
ColumnEncodingKind::Direct | ColumnEncodingKind::DirectV2 => {
@@ -72,7 +72,7 @@ pub fn new_string_decoder(column: &Column, stripe: &Stripe) -> Result<Box<dyn Ar
7272
let dictionary_strings = Arc::new(dictionary_strings);
7373

7474
let indexes = stripe.stream_map().get(column, Kind::Data);
75-
let indexes = get_unsigned_rle_reader(column, indexes);
75+
let indexes = get_unsigned_int_decoder(indexes, column.rle_version());
7676
let indexes = Int64ArrayDecoder::new(indexes, present);
7777

7878
Ok(Box::new(DictionaryStringArrayDecoder::new(

src/array_decoder/struct_decoder.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@ impl StructArrayDecoder {
4343
let decoders = column
4444
.children()
4545
.iter()
46-
.zip(fields.iter().cloned())
47-
.map(|(child, field)| array_decoder_factory(child, field, stripe))
46+
.zip(fields.iter())
47+
.map(|(child, field)| array_decoder_factory(child, field.data_type(), stripe))
4848
.collect::<Result<Vec<_>>>()?;
4949

5050
Ok(Self {

0 commit comments

Comments
 (0)