Skip to content

Commit 7d9b9ab

Browse files
authored
test: recordbatch+row ordering (#1237)
1 parent 506527d commit 7d9b9ab

File tree

1 file changed

+238
-2
lines changed

1 file changed

+238
-2
lines changed

src/parseable/staging/reader.rs

+238-2
Original file line numberDiff line numberDiff line change
@@ -321,14 +321,24 @@ fn find_limit_and_type(
321321

322322
#[cfg(test)]
323323
mod tests {
324-
use std::{io::Cursor, sync::Arc};
324+
use std::{
325+
fs::File,
326+
io::{self, Cursor, Read},
327+
path::Path,
328+
sync::Arc,
329+
};
325330

326331
use arrow_array::{
327-
cast::AsArray, types::Int64Type, Array, Float64Array, Int64Array, RecordBatch, StringArray,
332+
cast::AsArray, types::Int64Type, Array, Float64Array, Int32Array, Int64Array, RecordBatch,
333+
StringArray,
328334
};
329335
use arrow_ipc::writer::{
330336
write_message, DictionaryTracker, IpcDataGenerator, IpcWriteOptions, StreamWriter,
331337
};
338+
use arrow_schema::{DataType, Field, Schema};
339+
use temp_dir::TempDir;
340+
341+
use crate::parseable::staging::reader::{MergedReverseRecordReader, OffsetReader};
332342

333343
use super::get_reverse_reader;
334344

@@ -442,4 +452,230 @@ mod tests {
442452

443453
assert_eq!(sum, 10000);
444454
}
455+
456+
// Helper function to create test record batches
457+
fn create_test_batches(schema: &Arc<Schema>, count: usize) -> Vec<RecordBatch> {
458+
let mut batches = Vec::with_capacity(count);
459+
460+
for batch_num in 1..=count as i32 {
461+
let id_array = Int32Array::from_iter(batch_num * 10..=batch_num * 10 + 1);
462+
let name_array = StringArray::from(vec![
463+
format!("Name {batch_num}-1"),
464+
format!("Name {batch_num}-2"),
465+
]);
466+
467+
let batch = RecordBatch::try_new(
468+
schema.clone(),
469+
vec![Arc::new(id_array), Arc::new(name_array)],
470+
)
471+
.expect("Failed to create test batch");
472+
473+
batches.push(batch);
474+
}
475+
476+
batches
477+
}
478+
479+
// Helper function to write batches to a file
480+
fn write_test_batches(
481+
path: &Path,
482+
schema: &Arc<Schema>,
483+
batches: &[RecordBatch],
484+
) -> io::Result<()> {
485+
let file = File::create(path)?;
486+
let mut writer =
487+
StreamWriter::try_new(file, schema).expect("Failed to create StreamWriter");
488+
489+
for batch in batches {
490+
writer.write(batch).expect("Failed to write batch");
491+
}
492+
493+
writer.finish().expect("Failed to finalize writer");
494+
Ok(())
495+
}
496+
497+
#[test]
498+
fn test_offset_reader() {
499+
// Create a simple binary file in memory
500+
let data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
501+
let cursor = Cursor::new(data);
502+
503+
// Define offset list: (offset, size)
504+
let offsets = vec![(2, 3), (7, 2)]; // Read bytes 2-4 (3, 4, 5) and then 7-8 (8, 9)
505+
506+
let mut reader = OffsetReader::new(cursor, offsets);
507+
let mut buffer = [0u8; 10];
508+
509+
// First read should get bytes 3, 4, 5
510+
let read_bytes = reader.read(&mut buffer).unwrap();
511+
assert_eq!(read_bytes, 3);
512+
assert_eq!(&buffer[..read_bytes], &[3, 4, 5]);
513+
514+
// Second read should get bytes 8, 9
515+
let read_bytes = reader.read(&mut buffer).unwrap();
516+
assert_eq!(read_bytes, 2);
517+
assert_eq!(&buffer[..read_bytes], &[8, 9]);
518+
519+
// No more data
520+
let read_bytes = reader.read(&mut buffer).unwrap();
521+
assert_eq!(read_bytes, 0);
522+
}
523+
524+
#[test]
525+
fn test_merged_reverse_record_reader() -> io::Result<()> {
526+
let dir = TempDir::new().unwrap();
527+
let file_path = dir.path().join("test.arrow");
528+
529+
// Create a schema
530+
let schema = Arc::new(Schema::new(vec![
531+
Field::new("id", DataType::Int32, false),
532+
Field::new("name", DataType::Utf8, false),
533+
]));
534+
535+
// Create test batches (3 batches)
536+
let batches = create_test_batches(&schema, 3);
537+
538+
// Write batches to file
539+
write_test_batches(&file_path, &schema, &batches)?;
540+
541+
// Now read them back in reverse order
542+
let mut reader = MergedReverseRecordReader::try_new(&[file_path]).merged_iter(schema, None);
543+
544+
// We should get batches in reverse order: 3, 2, 1
545+
// But first message should be schema, so we'll still read them in order
546+
547+
// Read batch 3
548+
let batch = reader.next().expect("Failed to read batch");
549+
assert_eq!(batch.num_rows(), 2);
550+
let id_array = batch
551+
.column(0)
552+
.as_any()
553+
.downcast_ref::<Int32Array>()
554+
.unwrap();
555+
assert_eq!(id_array.value(0), 31); // affect of reverse on each recordbatch
556+
assert_eq!(id_array.value(1), 30);
557+
558+
// Read batch 2
559+
let batch = reader.next().expect("Failed to read batch");
560+
assert_eq!(batch.num_rows(), 2);
561+
let id_array = batch
562+
.column(0)
563+
.as_any()
564+
.downcast_ref::<Int32Array>()
565+
.unwrap();
566+
assert_eq!(id_array.value(0), 21);
567+
assert_eq!(id_array.value(1), 20);
568+
569+
// Read batch 1
570+
let batch = reader.next().expect("Failed to read batch");
571+
assert_eq!(batch.num_rows(), 2);
572+
let id_array = batch
573+
.column(0)
574+
.as_any()
575+
.downcast_ref::<Int32Array>()
576+
.unwrap();
577+
assert_eq!(id_array.value(0), 11);
578+
assert_eq!(id_array.value(1), 10);
579+
580+
// No more batches
581+
assert!(reader.next().is_none());
582+
583+
Ok(())
584+
}
585+
586+
#[test]
587+
fn test_empty_offset_list() {
588+
// Test with empty offset list
589+
let data = vec![1, 2, 3, 4, 5];
590+
let cursor = Cursor::new(data);
591+
592+
let mut reader = OffsetReader::new(cursor, vec![]);
593+
let mut buffer = [0u8; 10];
594+
595+
// Should return 0 bytes read
596+
let read_bytes = reader.read(&mut buffer).unwrap();
597+
assert_eq!(read_bytes, 0);
598+
}
599+
600+
#[test]
601+
fn test_partial_reads() {
602+
// Test reading with a buffer smaller than the section size
603+
let data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
604+
let cursor = Cursor::new(data);
605+
606+
// One offset of 5 bytes
607+
let offsets = vec![(2, 5)]; // Read bytes 2-6 (3, 4, 5, 6, 7)
608+
609+
let mut reader = OffsetReader::new(cursor, offsets);
610+
let mut buffer = [0u8; 3]; // Buffer smaller than the 5 bytes we want to read
611+
612+
// First read should get first 3 bytes: 3, 4, 5
613+
let read_bytes = reader.read(&mut buffer).unwrap();
614+
assert_eq!(read_bytes, 3);
615+
assert_eq!(&buffer[..read_bytes], &[3, 4, 5]);
616+
617+
// Second read should get remaining 2 bytes: 6, 7
618+
let read_bytes = reader.read(&mut buffer).unwrap();
619+
assert_eq!(read_bytes, 2);
620+
assert_eq!(&buffer[..read_bytes], &[6, 7]);
621+
622+
// No more data
623+
let read_bytes = reader.read(&mut buffer).unwrap();
624+
assert_eq!(read_bytes, 0);
625+
}
626+
627+
#[test]
628+
fn test_get_reverse_reader_single_message() -> io::Result<()> {
629+
let dir = TempDir::new().unwrap();
630+
let file_path = dir.path().join("test_single.arrow");
631+
632+
// Create a schema
633+
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
634+
635+
// Create a single batch
636+
let batch =
637+
RecordBatch::try_new(schema.clone(), vec![Arc::new(Int32Array::from(vec![42]))])
638+
.expect("Failed to create batch");
639+
640+
// Write batch to file
641+
write_test_batches(&file_path, &schema, &[batch])?;
642+
643+
let mut reader = MergedReverseRecordReader::try_new(&[file_path]).merged_iter(schema, None);
644+
645+
// Should get the batch
646+
let result_batch = reader.next().expect("Failed to read batch");
647+
let id_array = result_batch
648+
.column(0)
649+
.as_any()
650+
.downcast_ref::<Int32Array>()
651+
.unwrap();
652+
assert_eq!(id_array.value(0), 42);
653+
654+
// No more batches
655+
assert!(reader.next().is_none());
656+
657+
Ok(())
658+
}
659+
660+
#[test]
661+
fn test_large_buffer_resizing() {
662+
// Test that buffer resizes correctly for large sections
663+
let data = vec![1; 10000]; // 10KB of data
664+
let cursor = Cursor::new(data);
665+
666+
// One large offset (8KB)
667+
let offsets = vec![(1000, 8000)];
668+
669+
let mut reader = OffsetReader::new(cursor, offsets);
670+
let mut buffer = [0u8; 10000];
671+
672+
// Should read 8KB
673+
let read_bytes = reader.read(&mut buffer).unwrap();
674+
assert_eq!(read_bytes, 8000);
675+
676+
// All bytes should be 1
677+
for i in 0..read_bytes {
678+
assert_eq!(buffer[i], 1);
679+
}
680+
}
445681
}

0 commit comments

Comments
 (0)