Skip to content

Commit 5db09e8

Browse files
Subham SinghalSubham Singhal
authored andcommitted
Fix lint
1 parent 2d95832 commit 5db09e8

5 files changed

Lines changed: 122 additions & 77 deletions

File tree

datafusion/core/tests/parquet/field_id.rs

Lines changed: 58 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
//! Integration tests for Parquet field ID support
1919
2020
use arrow::array::{
21-
Array, Int32Array, Int64Array, RecordBatch, StringArray, StringViewArray
21+
Array, Int32Array, Int64Array, RecordBatch, StringArray, StringViewArray,
2222
};
2323
use arrow::datatypes::{DataType, Field, Schema};
2424
use datafusion::prelude::*;
@@ -97,9 +97,9 @@ async fn test_read_parquet_with_field_ids_enabled() -> Result<()> {
9797
ctx.register_parquet(
9898
"test",
9999
file_path.to_str().unwrap(),
100-
ParquetReadOptions::default()
100+
ParquetReadOptions::default(),
101101
)
102-
.await?;
102+
.await?;
103103

104104
let df = ctx.sql("SELECT user_id, amount, name FROM test").await?;
105105
let results = df.collect().await?;
@@ -144,11 +144,12 @@ async fn test_read_parquet_with_field_ids_disabled() -> Result<()> {
144144
// Create context with field ID reading disabled (default)
145145
let ctx = SessionContext::new();
146146

147-
ctx.register_parquet("test",
148-
file_path.to_str().unwrap(),
149-
ParquetReadOptions::default()
147+
ctx.register_parquet(
148+
"test",
149+
file_path.to_str().unwrap(),
150+
ParquetReadOptions::default(),
150151
)
151-
.await?;
152+
.await?;
152153

153154
let df = ctx.sql("SELECT user_id, amount FROM test").await?;
154155
let results = df.collect().await?;
@@ -193,11 +194,12 @@ async fn test_schema_evolution_renamed_columns() -> Result<()> {
193194
.await?;
194195

195196
// Register table with original names
196-
ctx.register_parquet("test",
197-
file_path.to_str().unwrap(),
198-
ParquetReadOptions::default()
197+
ctx.register_parquet(
198+
"test",
199+
file_path.to_str().unwrap(),
200+
ParquetReadOptions::default(),
199201
)
200-
.await?;
202+
.await?;
201203

202204
// Query should work with original names
203205
let df = ctx.sql("SELECT user_id, amount FROM test").await?;
@@ -248,11 +250,12 @@ async fn test_schema_evolution_reordered_columns() -> Result<()> {
248250
.collect()
249251
.await?;
250252

251-
ctx.register_parquet("test",
252-
file_path.to_str().unwrap(),
253-
ParquetReadOptions::default()
253+
ctx.register_parquet(
254+
"test",
255+
file_path.to_str().unwrap(),
256+
ParquetReadOptions::default(),
254257
)
255-
.await?;
258+
.await?;
256259

257260
// Query columns in different order: c, a, b
258261
let df = ctx.sql("SELECT c, a, b FROM test").await?;
@@ -315,10 +318,12 @@ async fn test_projection_with_field_ids() -> Result<()> {
315318
.collect()
316319
.await?;
317320

318-
ctx.register_parquet("test",
319-
file_path.to_str().unwrap(),
320-
ParquetReadOptions::default()
321-
).await?;
321+
ctx.register_parquet(
322+
"test",
323+
file_path.to_str().unwrap(),
324+
ParquetReadOptions::default(),
325+
)
326+
.await?;
322327

323328
// Project only columns a and c
324329
let df = ctx.sql("SELECT a, c FROM test").await?;
@@ -370,14 +375,17 @@ async fn test_filter_with_field_ids() -> Result<()> {
370375
.collect()
371376
.await?;
372377

373-
ctx.register_parquet("test",
374-
file_path.to_str().unwrap(),
375-
ParquetReadOptions::default()
378+
ctx.register_parquet(
379+
"test",
380+
file_path.to_str().unwrap(),
381+
ParquetReadOptions::default(),
376382
)
377-
.await?;
383+
.await?;
378384

379385
// Filter with field IDs
380-
let df = ctx.sql("SELECT id, value FROM test WHERE value > 25").await?;
386+
let df = ctx
387+
.sql("SELECT id, value FROM test WHERE value > 25")
388+
.await?;
381389
let results = df.collect().await?;
382390

383391
assert_eq!(results.len(), 1);
@@ -424,9 +432,9 @@ async fn test_aggregation_with_field_ids() -> Result<()> {
424432
ctx.register_parquet(
425433
"test",
426434
file_path.to_str().unwrap(),
427-
ParquetReadOptions::default()
435+
ParquetReadOptions::default(),
428436
)
429-
.await?;
437+
.await?;
430438

431439
// Aggregate with field IDs
432440
let df = ctx
@@ -440,25 +448,24 @@ async fn test_aggregation_with_field_ids() -> Result<()> {
440448
// Get category column - it might be StringArray or StringViewArray depending on config
441449
let category_col = results[0].column(0);
442450
let categories: Vec<&str> = match category_col.data_type() {
443-
DataType::Utf8 => {
444-
category_col
445-
.as_any()
446-
.downcast_ref::<StringArray>()
447-
.unwrap()
448-
.iter()
449-
.map(|v| v.unwrap())
450-
.collect()
451-
}
452-
DataType::Utf8View => {
453-
category_col
454-
.as_any()
455-
.downcast_ref::<StringViewArray>()
456-
.unwrap()
457-
.iter()
458-
.map(|v| v.unwrap())
459-
.collect()
460-
}
461-
_ => panic!("Unexpected data type for category column: {:?}", category_col.data_type()),
451+
DataType::Utf8 => category_col
452+
.as_any()
453+
.downcast_ref::<StringArray>()
454+
.unwrap()
455+
.iter()
456+
.map(|v| v.unwrap())
457+
.collect(),
458+
DataType::Utf8View => category_col
459+
.as_any()
460+
.downcast_ref::<StringViewArray>()
461+
.unwrap()
462+
.iter()
463+
.map(|v| v.unwrap())
464+
.collect(),
465+
_ => panic!(
466+
"Unexpected data type for category column: {:?}",
467+
category_col.data_type()
468+
),
462469
};
463470

464471
let totals = results[0]
@@ -504,8 +511,12 @@ async fn test_fallback_to_name_when_no_field_ids() -> Result<()> {
504511
.collect()
505512
.await?;
506513

507-
ctx.register_parquet("test", file_path.to_str().unwrap(), ParquetReadOptions::default())
508-
.await?;
514+
ctx.register_parquet(
515+
"test",
516+
file_path.to_str().unwrap(),
517+
ParquetReadOptions::default(),
518+
)
519+
.await?;
509520

510521
// Should fall back to name-based matching
511522
let df = ctx.sql("SELECT user_id, amount FROM test").await?;

datafusion/datasource-parquet/src/metadata.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -345,9 +345,11 @@ impl<'a> DFParquetMetadata<'a> {
345345
)?;
346346

347347
// Apply type coercions without field ID matching (statistics use name-based matching)
348-
if let Some(merged) =
349-
apply_file_schema_type_coercions(logical_file_schema, &physical_file_schema, false)
350-
{
348+
if let Some(merged) = apply_file_schema_type_coercions(
349+
logical_file_schema,
350+
&physical_file_schema,
351+
false,
352+
) {
351353
physical_file_schema = merged;
352354
}
353355

datafusion/datasource-parquet/src/opener.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -639,7 +639,8 @@ impl FileOpener for ParquetOpener {
639639
)
640640
})?
641641
} else {
642-
projection.try_map_exprs(|expr| reassign_expr_columns(expr, &stream_schema))?
642+
projection
643+
.try_map_exprs(|expr| reassign_expr_columns(expr, &stream_schema))?
643644
};
644645

645646
let projector = projection.make_projector(&stream_schema)?;

datafusion/datasource-parquet/src/source.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -541,7 +541,8 @@ impl FileSource for ParquetSource {
541541
.as_ref()
542542
.map(|time_unit| parse_coerce_int96_string(time_unit.as_str()).unwrap());
543543

544-
let field_id_read_enabled = self.table_parquet_options.global.field_id_read_enabled;
544+
let field_id_read_enabled =
545+
self.table_parquet_options.global.field_id_read_enabled;
545546

546547
let opener = Arc::new(ParquetOpener {
547548
partition_index: partition,

datafusion/physical-expr/src/utils/mod.rs

Lines changed: 55 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -686,8 +686,10 @@ pub(crate) mod tests {
686686
metadata2.insert("PARQUET:field_id".to_string(), "2".to_string());
687687

688688
let source_schema = Schema::new(vec![
689-
Field::new("user_id", DataType::Int64, false).with_metadata(metadata1.clone()),
690-
Field::new("amount", DataType::Float64, false).with_metadata(metadata2.clone()),
689+
Field::new("user_id", DataType::Int64, false)
690+
.with_metadata(metadata1.clone()),
691+
Field::new("amount", DataType::Float64, false)
692+
.with_metadata(metadata2.clone()),
691693
]);
692694

693695
// Target schema: renamed columns but same field IDs
@@ -698,10 +700,16 @@ pub(crate) mod tests {
698700

699701
// Should match by field ID, not name
700702
let index = find_field_index("user_id", &source_schema, &target_schema)?;
701-
assert_eq!(index, 0, "user_id (field_id=1) should match customer_id at index 0");
703+
assert_eq!(
704+
index, 0,
705+
"user_id (field_id=1) should match customer_id at index 0"
706+
);
702707

703708
let index = find_field_index("amount", &source_schema, &target_schema)?;
704-
assert_eq!(index, 1, "amount (field_id=2) should match price at index 1");
709+
assert_eq!(
710+
index, 1,
711+
"amount (field_id=2) should match price at index 1"
712+
);
705713

706714
Ok(())
707715
}
@@ -752,15 +760,20 @@ pub(crate) mod tests {
752760
]);
753761

754762
// Should fall back to name-based matching
755-
assert_eq!(find_field_index("user_id", &source_schema, &target_schema)?, 0);
756-
assert_eq!(find_field_index("amount", &source_schema, &target_schema)?, 1);
763+
assert_eq!(
764+
find_field_index("user_id", &source_schema, &target_schema)?,
765+
0
766+
);
767+
assert_eq!(
768+
find_field_index("amount", &source_schema, &target_schema)?,
769+
1
770+
);
757771

758772
Ok(())
759773
}
760774

761775
#[test]
762776
fn test_find_field_index_mixed_field_ids() -> Result<()> {
763-
764777
// Source schema: some fields have IDs, some don't
765778
let mut metadata1 = HashMap::new();
766779
metadata1.insert("PARQUET:field_id".to_string(), "1".to_string());
@@ -786,13 +799,9 @@ pub(crate) mod tests {
786799

787800
#[test]
788801
fn test_find_field_index_not_found() {
789-
let source_schema = Schema::new(vec![
790-
Field::new("a", DataType::Int64, false),
791-
]);
802+
let source_schema = Schema::new(vec![Field::new("a", DataType::Int64, false)]);
792803

793-
let target_schema = Schema::new(vec![
794-
Field::new("b", DataType::Int64, false),
795-
]);
804+
let target_schema = Schema::new(vec![Field::new("b", DataType::Int64, false)]);
796805

797806
// Should fail to find non-existent field
798807
let result = find_field_index("a", &source_schema, &target_schema);
@@ -801,7 +810,6 @@ pub(crate) mod tests {
801810

802811
#[test]
803812
fn test_reassign_expr_columns_with_field_ids_simple() -> Result<()> {
804-
805813
// Source schema: full file schema
806814
let mut meta1 = HashMap::new();
807815
meta1.insert("PARQUET:field_id".to_string(), "1".to_string());
@@ -826,18 +834,22 @@ pub(crate) mod tests {
826834
let expr: Arc<dyn PhysicalExpr> = Arc::new(Column::new("age", 2));
827835

828836
// After transformation, should reference age at index 1 in target schema
829-
let result = reassign_expr_columns_with_field_ids(expr, &source_schema, &target_schema)?;
837+
let result =
838+
reassign_expr_columns_with_field_ids(expr, &source_schema, &target_schema)?;
830839

831840
let column = result.as_any().downcast_ref::<Column>().unwrap();
832841
assert_eq!(column.name(), "age");
833-
assert_eq!(column.index(), 1, "age should be at index 1 in target schema");
842+
assert_eq!(
843+
column.index(),
844+
1,
845+
"age should be at index 1 in target schema"
846+
);
834847

835848
Ok(())
836849
}
837850

838851
#[test]
839852
fn test_reassign_expr_columns_with_field_ids_complex() -> Result<()> {
840-
841853
// Source schema
842854
let mut meta1 = HashMap::new();
843855
meta1.insert("PARQUET:field_id".to_string(), "1".to_string());
@@ -867,27 +879,39 @@ pub(crate) mod tests {
867879
)?;
868880

869881
// After transformation: a@0 + c@1
870-
let result = reassign_expr_columns_with_field_ids(expr, &source_schema, &target_schema)?;
882+
let result =
883+
reassign_expr_columns_with_field_ids(expr, &source_schema, &target_schema)?;
871884

872885
// Verify it's still a binary expression
873886
let binary_expr = result.as_any().downcast_ref::<BinaryExpr>().unwrap();
874887

875888
// Check left side (a)
876-
let left_col = binary_expr.left().as_any().downcast_ref::<Column>().unwrap();
889+
let left_col = binary_expr
890+
.left()
891+
.as_any()
892+
.downcast_ref::<Column>()
893+
.unwrap();
877894
assert_eq!(left_col.name(), "a");
878895
assert_eq!(left_col.index(), 0);
879896

880897
// Check right side (c)
881-
let right_col = binary_expr.right().as_any().downcast_ref::<Column>().unwrap();
898+
let right_col = binary_expr
899+
.right()
900+
.as_any()
901+
.downcast_ref::<Column>()
902+
.unwrap();
882903
assert_eq!(right_col.name(), "c");
883-
assert_eq!(right_col.index(), 1, "c should be remapped from index 2 to 1");
904+
assert_eq!(
905+
right_col.index(),
906+
1,
907+
"c should be remapped from index 2 to 1"
908+
);
884909

885910
Ok(())
886911
}
887912

888913
#[test]
889914
fn test_reassign_expr_columns_with_field_ids_renamed_columns() -> Result<()> {
890-
891915
// Source schema (file schema with old names)
892916
let mut meta1 = HashMap::new();
893917
meta1.insert("PARQUET:field_id".to_string(), "1".to_string());
@@ -909,11 +933,16 @@ pub(crate) mod tests {
909933
let expr: Arc<dyn PhysicalExpr> = Arc::new(Column::new("user_id", 0));
910934

911935
// After transformation, should still reference by old name but correct index
912-
let result = reassign_expr_columns_with_field_ids(expr, &source_schema, &target_schema)?;
936+
let result =
937+
reassign_expr_columns_with_field_ids(expr, &source_schema, &target_schema)?;
913938

914939
let column = result.as_any().downcast_ref::<Column>().unwrap();
915940
assert_eq!(column.name(), "user_id", "Name should remain user_id");
916-
assert_eq!(column.index(), 0, "Should match customer_id at index 0 via field_id");
941+
assert_eq!(
942+
column.index(),
943+
0,
944+
"Should match customer_id at index 0 via field_id"
945+
);
917946

918947
Ok(())
919948
}
@@ -936,7 +965,8 @@ pub(crate) mod tests {
936965
let expr: Arc<dyn PhysicalExpr> = Arc::new(Column::new("c", 2));
937966

938967
// Should fall back to name-based matching
939-
let result = reassign_expr_columns_with_field_ids(expr, &source_schema, &target_schema)?;
968+
let result =
969+
reassign_expr_columns_with_field_ids(expr, &source_schema, &target_schema)?;
940970

941971
let column = result.as_any().downcast_ref::<Column>().unwrap();
942972
assert_eq!(column.name(), "c");

0 commit comments

Comments
 (0)