Skip to content

Commit 521e1b8

Browse files
committed
test(row_id): add unit coverage for edge cases and the SinglePartition guard
Four additional unit tests on RowIdExec: * rowid_only_projection_empty_input_columns — zero-column input (count-rows mode produced by parquet scans where only rowid is projected). Verifies RowIdExec still emits a 1-column batch with the right values. * empty_batch_passes_through_with_empty_rowid_column — a zero-row batch followed by a non-empty one. Confirms the rowid column is present in the empty batch and that the cursor doesn't advance, so the next batch's rowids start at row_id_start + 0. * insert_at_out_of_range_clamps_to_end — constructor clamps a too-large insert position to the end of the input schema (defense against caller errors rather than panicking). * declares_single_partition_input_to_block_repartition — regression guard for the latent issue fixed in the previous commit. If required_input_distribution or maintains_input_order ever revert to the ExecutionPlan defaults, DataFusion's Repartition rule could silently break the cursor-based rowid synthesis.
1 parent b2648c4 commit 521e1b8

1 file changed

Lines changed: 122 additions & 0 deletions

File tree

src/row_id.rs

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -350,6 +350,128 @@ mod tests {
350350
assert_eq!(b, vec![100, 200]);
351351
}
352352

353+
#[tokio::test]
354+
async fn rowid_only_projection_empty_input_columns() {
355+
// The input has zero columns (count-rows-only mode). RowIdExec
356+
// should still emit a single-column batch with the rowid values.
357+
// This is the shape that flows from a parquet scan when only rowid
358+
// is in the projection.
359+
let input_schema = Arc::new(Schema::new(Vec::<Field>::new()));
360+
let batch = RecordBatch::try_new_with_options(
361+
input_schema.clone(),
362+
vec![],
363+
&arrow::record_batch::RecordBatchOptions::new().with_row_count(Some(3)),
364+
)
365+
.unwrap();
366+
let mem =
367+
MemorySourceConfig::try_new_exec(&[vec![batch]], input_schema.clone(), None)
368+
.unwrap();
369+
370+
let exec = Arc::new(RowIdExec::new(mem, Some(42)));
371+
assert_eq!(exec.schema().fields().len(), 1);
372+
assert_eq!(exec.schema().field(0).name(), ROWID_COLUMN_NAME);
373+
374+
let ctx = Arc::new(TaskContext::default());
375+
let mut stream = exec.execute(0, ctx).unwrap();
376+
use futures::StreamExt;
377+
let out = stream.next().await.unwrap().unwrap();
378+
assert_eq!(out.num_rows(), 3);
379+
assert_eq!(out.num_columns(), 1);
380+
let rowids = out
381+
.column(0)
382+
.as_any()
383+
.downcast_ref::<Int64Array>()
384+
.unwrap()
385+
.values()
386+
.to_vec();
387+
assert_eq!(rowids, vec![42, 43, 44]);
388+
}
389+
390+
#[tokio::test]
391+
async fn empty_batch_passes_through_with_empty_rowid_column() {
392+
// A zero-row batch from the input should produce a zero-row batch
393+
// out, with the rowid column present and the cursor unchanged so
394+
// the next non-empty batch picks up at the right offset.
395+
let input_schema = Arc::new(Schema::new(vec![Field::new("v", DataType::Int32, false)]));
396+
let empty_batch = RecordBatch::try_new(
397+
input_schema.clone(),
398+
vec![Arc::new(Int32Array::from(Vec::<i32>::new())) as ArrayRef],
399+
)
400+
.unwrap();
401+
let next_batch = small_batch(input_schema.clone(), &[7, 8]);
402+
let mem = MemorySourceConfig::try_new_exec(
403+
&[vec![empty_batch, next_batch]],
404+
input_schema.clone(),
405+
None,
406+
)
407+
.unwrap();
408+
409+
let exec = Arc::new(RowIdExec::new(mem, Some(100)));
410+
let ctx = Arc::new(TaskContext::default());
411+
let mut stream = exec.execute(0, ctx).unwrap();
412+
use futures::StreamExt;
413+
414+
let first = stream.next().await.unwrap().unwrap();
415+
assert_eq!(first.num_rows(), 0);
416+
assert_eq!(first.num_columns(), 2);
417+
418+
let second = stream.next().await.unwrap().unwrap();
419+
let rowids = second
420+
.column(1)
421+
.as_any()
422+
.downcast_ref::<Int64Array>()
423+
.unwrap()
424+
.values()
425+
.to_vec();
426+
// Cursor should NOT have advanced past the empty batch — second
427+
// batch rows still start at row_id_start + 0.
428+
assert_eq!(rowids, vec![100, 101]);
429+
}
430+
431+
#[tokio::test]
432+
async fn insert_at_out_of_range_clamps_to_end() {
433+
// Caller asks to insert at position 99 when the input has only 1
434+
// column. The constructor clamps to input.len() (= 1) so the rowid
435+
// ends up appended at the end.
436+
let input_schema = Arc::new(Schema::new(vec![Field::new("v", DataType::Int32, false)]));
437+
let b = small_batch(input_schema.clone(), &[1, 2]);
438+
let mem =
439+
MemorySourceConfig::try_new_exec(&[vec![b]], input_schema.clone(), None).unwrap();
440+
441+
let exec = Arc::new(RowIdExec::new_at(mem, Some(10), 99));
442+
assert_eq!(exec.schema().fields().len(), 2);
443+
assert_eq!(exec.schema().field(0).name(), "v");
444+
assert_eq!(exec.schema().field(1).name(), ROWID_COLUMN_NAME);
445+
}
446+
447+
#[test]
448+
fn declares_single_partition_input_to_block_repartition() {
449+
// Regression guard for the cursor-vs-RepartitionExec hazard: if
450+
// either of these defaults reverts, DataFusion's optimizer could
451+
// legally insert a RepartitionExec under RowIdExec and break
452+
// rowid computation silently. We do not over-assert the exact
453+
// Distribution shape — just that it is SinglePartition for the
454+
// sole child.
455+
let input_schema = Arc::new(Schema::new(vec![Field::new("v", DataType::Int32, false)]));
456+
let mem =
457+
MemorySourceConfig::try_new_exec(&[vec![]], input_schema, None).unwrap();
458+
let exec = RowIdExec::new(mem, Some(0));
459+
460+
let dists = exec.required_input_distribution();
461+
assert_eq!(dists.len(), 1);
462+
assert!(
463+
matches!(dists[0], Distribution::SinglePartition),
464+
"RowIdExec must require a single-partition input; got {:?}",
465+
dists[0]
466+
);
467+
assert_eq!(
468+
exec.maintains_input_order(),
469+
vec![true],
470+
"RowIdExec preserves row order — must declare it so DataFusion's \
471+
order-aware optimizations can fire",
472+
);
473+
}
474+
353475
#[tokio::test]
354476
async fn emits_null_when_row_id_start_is_none() {
355477
let input_schema = Arc::new(Schema::new(vec![Field::new("v", DataType::Int32, false)]));

0 commit comments

Comments
 (0)