Skip to content

Commit c241260

Browse files
authored
feat: Support streaming IPC scan from S3 object store (#25868)
1 parent bd25d66 commit c241260

File tree

12 files changed

+1166
-670
lines changed

12 files changed

+1166
-670
lines changed

crates/polars-arrow/src/io/ipc/read/file.rs

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ pub struct FileMetadata {
3333
pub blocks: Vec<arrow_format::ipc::Block>,
3434

3535
/// Dictionaries associated to each dict_id
36-
pub(crate) dictionaries: Option<Vec<arrow_format::ipc::Block>>,
36+
pub dictionaries: Option<Vec<arrow_format::ipc::Block>>,
3737

3838
/// The total size of the file in bytes
3939
pub size: u64,
@@ -78,27 +78,34 @@ pub(crate) fn get_dictionary_batch<'a>(
7878
}
7979
}
8080

81-
fn read_dictionary_block<R: Read + Seek>(
81+
pub fn read_dictionary_block<R: Read + Seek>(
8282
reader: &mut R,
8383
metadata: &FileMetadata,
8484
block: &arrow_format::ipc::Block,
85+
// When true, the underlying reader bytestream represents a standalone IPC Block
86+
// rather than a complete IPC File.
87+
force_zero_offset: bool,
8588
dictionaries: &mut Dictionaries,
8689
message_scratch: &mut Vec<u8>,
8790
dictionary_scratch: &mut Vec<u8>,
8891
) -> PolarsResult<()> {
89-
let message = get_message_from_block(reader, block, message_scratch)?;
90-
let batch = get_dictionary_batch(&message)?;
91-
92-
let offset: u64 = block
93-
.offset
94-
.try_into()
95-
.map_err(|_| polars_err!(oos = OutOfSpecKind::UnexpectedNegativeInteger))?;
92+
let offset: u64 = if force_zero_offset {
93+
0
94+
} else {
95+
block
96+
.offset
97+
.try_into()
98+
.map_err(|_| polars_err!(oos = OutOfSpecKind::UnexpectedNegativeInteger))?
99+
};
96100

97101
let length: u64 = block
98102
.meta_data_length
99103
.try_into()
100104
.map_err(|_| polars_err!(oos = OutOfSpecKind::UnexpectedNegativeInteger))?;
101105

106+
let message = get_message_from_block_offset(reader, offset, message_scratch)?;
107+
let batch = get_dictionary_batch(&message)?;
108+
102109
read_dictionary(
103110
batch,
104111
&metadata.schema,
@@ -132,6 +139,7 @@ pub fn read_file_dictionaries<R: Read + Seek>(
132139
reader,
133140
metadata,
134141
block,
142+
false,
135143
&mut dictionaries,
136144
&mut message_scratch,
137145
scratch,
@@ -281,19 +289,19 @@ pub(crate) fn get_record_batch(
281289
}
282290
}
283291

284-
fn get_message_from_block_offset<'a, R: Read + Seek>(
292+
pub fn get_message_from_block_offset<'a, R: Read + Seek>(
285293
reader: &mut R,
286294
offset: u64,
287295
message_scratch: &'a mut Vec<u8>,
288296
) -> PolarsResult<arrow_format::ipc::MessageRef<'a>> {
289-
// read length
290297
reader.seek(SeekFrom::Start(offset))?;
291298
let mut meta_buf = [0; 4];
292299
reader.read_exact(&mut meta_buf)?;
293300
if meta_buf == CONTINUATION_MARKER {
294301
// continuation marker encountered, read message next
295302
reader.read_exact(&mut meta_buf)?;
296303
}
304+
297305
let meta_len = i32::from_le_bytes(meta_buf)
298306
.try_into()
299307
.map_err(|_| polars_err!(oos = OutOfSpecKind::UnexpectedNegativeInteger))?;
@@ -337,15 +345,21 @@ pub fn read_batch<R: Read + Seek>(
337345
projection: Option<&[usize]>,
338346
limit: Option<usize>,
339347
index: usize,
348+
// When true, the reader object is handled as an IPC Block.
349+
block_mode: bool,
340350
message_scratch: &mut Vec<u8>,
341351
data_scratch: &mut Vec<u8>,
342352
) -> PolarsResult<RecordBatchT<Box<dyn Array>>> {
343353
let block = metadata.blocks[index];
344354

345-
let offset: u64 = block
346-
.offset
347-
.try_into()
348-
.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
355+
let offset: u64 = if block_mode {
356+
0
357+
} else {
358+
block
359+
.offset
360+
.try_into()
361+
.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?
362+
};
349363

350364
let length: u64 = block
351365
.meta_data_length

crates/polars-arrow/src/io/ipc/read/mod.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
//! APIs to read Arrow's IPC format.
22
//!
3-
//! The two important structs here are the [`FileReader`](reader::FileReader),
3+
//! The two important File-based structs here are the [`FileReader`](reader::FileReader),
44
//! which provides arbitrary access to any of its messages, and the
55
//! [`StreamReader`](stream::StreamReader), which only supports reading
66
//! data in the order it was written in.
7+
//! In addition, there is a Block-based struct [`BlockReader`](reader::BlockReader), which
8+
//! enabled random access to a standalone IPC Block.
79
use crate::array::Array;
810

911
mod array;
10-
mod common;
12+
pub mod common;
1113
mod deserialize;
1214
mod error;
1315
pub(crate) mod file;
@@ -23,10 +25,10 @@ pub use common::{ProjectionInfo, prepare_projection};
2325
pub use error::OutOfSpecKind;
2426
pub use file::{
2527
FileMetadata, deserialize_footer, get_row_count, get_row_count_from_blocks, read_batch,
26-
read_file_dictionaries, read_file_metadata,
28+
read_dictionary_block, read_file_dictionaries, read_file_metadata,
2729
};
2830
use polars_utils::aliases::PlHashMap;
29-
pub use reader::FileReader;
31+
pub use reader::{BlockReader, FileReader};
3032
pub use schema::deserialize_schema;
3133
pub use stream::{StreamMetadata, StreamReader, StreamState, read_stream_metadata};
3234

crates/polars-arrow/src/io/ipc/read/reader.rs

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::io::{Read, Seek};
33
use polars_error::PolarsResult;
44

55
use super::common::*;
6-
use super::file::{get_message_from_block, get_record_batch};
6+
use super::file::{get_message_from_block, get_message_from_block_offset, get_record_batch};
77
use super::{Dictionaries, FileMetadata, read_batch, read_file_dictionaries};
88
use crate::array::Array;
99
use crate::datatypes::ArrowSchema;
@@ -114,7 +114,7 @@ impl<R: Read + Seek> FileReader<R> {
114114
(self.data_scratch, self.message_scratch) = scratches;
115115
}
116116

117-
fn read_dictionaries(&mut self) -> PolarsResult<()> {
117+
pub fn read_dictionaries(&mut self) -> PolarsResult<()> {
118118
if self.dictionaries.is_none() {
119119
self.dictionaries = Some(read_file_dictionaries(
120120
&mut self.reader,
@@ -187,6 +187,7 @@ impl<R: Read + Seek> Iterator for FileReader<R> {
187187
self.projection.as_ref().map(|x| x.columns.as_ref()),
188188
Some(self.remaining),
189189
block,
190+
false,
190191
&mut self.message_scratch,
191192
&mut self.data_scratch,
192193
);
@@ -201,3 +202,27 @@ impl<R: Read + Seek> Iterator for FileReader<R> {
201202
Some(chunk)
202203
}
203204
}
205+
206+
/// A reader that has access to exactly one standalone IPC Block of an Arrow IPC file.
207+
/// The block contains either a `RecordBatch` or a `DictionaryBatch`.
208+
/// The `dictionaries` field must be initialized prior to decoding a `RecordBatch`.
209+
pub struct BlockReader<R: Read + Seek> {
210+
pub reader: R,
211+
}
212+
213+
impl<R: Read + Seek> BlockReader<R> {
214+
pub fn new(reader: R) -> Self {
215+
Self { reader }
216+
}
217+
218+
/// Reads the record batch header and returns its length (i.e., number of rows).
219+
pub fn record_batch_num_rows(&mut self, message_scratch: &mut Vec<u8>) -> PolarsResult<usize> {
220+
let offset: u64 = 0;
221+
222+
let message = get_message_from_block_offset(&mut self.reader, offset, message_scratch)?;
223+
let batch = get_record_batch(message)?;
224+
225+
let out = batch.length().map(|l| usize::try_from(l).unwrap())?;
226+
Ok(out)
227+
}
228+
}

0 commit comments

Comments
 (0)