Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion-federation/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ async-trait.workspace = true
datafusion.workspace = true
async-stream.workspace = true
arrow-json.workspace = true
tracing = "0.1"

[dev-dependencies]
tokio.workspace = true
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
tracing = "0.1.40"
insta = { version = "1.42.0", features = ["filters"] }

[[example]]
Expand Down
2 changes: 1 addition & 1 deletion datafusion-federation/src/schema_cast/lists_cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ macro_rules! cast_string_to_list_array {
None => list_builder.append_null(),
Some(string_value) => {
decoder.decode(string_value.as_bytes()).map_err(|e| {
ArrowError::CastError(format!("Failed to decode value: {e}"))
ArrowError::CastError(format!("Failed to decode value {string_value}: {e}"))
})?;

if let Some(b) = decoder.flush().map_err(|e| {
Expand Down
78 changes: 64 additions & 14 deletions datafusion-federation/src/schema_cast/record_convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ pub enum Error {
source: datafusion::arrow::error::ArrowError,
},

UnableToCastColumn {
source: datafusion::arrow::error::ArrowError,
column_index: usize,
column_name: String,
from_type: DataType,
to_type: DataType,
},

UnexpectedNumberOfColumns {
expected: usize,
found: usize,
Expand All @@ -35,6 +43,18 @@ impl std::fmt::Display for Error {
Error::UnableToConvertRecordBatch { source } => {
write!(f, "Unable to convert record batch: {source}")
}
Error::UnableToCastColumn {
source,
column_index,
column_name,
from_type,
to_type,
} => {
write!(
f,
"Unable to cast column {column_index} '{column_name}' from {from_type} to {to_type}: {source}"
)
}
Error::UnexpectedNumberOfColumns { expected, found } => {
write!(
f,
Expand All @@ -52,6 +72,12 @@ pub fn try_cast_to(record_batch: RecordBatch, expected_schema: SchemaRef) -> Res
let actual_schema = record_batch.schema();

if actual_schema.fields().len() != expected_schema.fields().len() {
tracing::debug!(
actual_schema = ?actual_schema,
expected_schema = ?expected_schema,
"Schema mismatch in try_cast_to"
);
tracing::trace!(record_batch = ?record_batch, "Record batch contents");
return Err(Error::UnexpectedNumberOfColumns {
expected: expected_schema.fields().len(),
found: actual_schema.fields().len(),
Expand All @@ -64,68 +90,92 @@ pub fn try_cast_to(record_batch: RecordBatch, expected_schema: SchemaRef) -> Res
.enumerate()
.map(|(i, expected_field)| {
let record_batch_col = record_batch.column(i);
let from_type = record_batch_col.data_type().clone();
let to_type = expected_field.data_type().clone();
let make_err = |e| Error::UnableToCastColumn {
source: e,
column_index: i,
column_name: expected_field.name().clone(),
from_type: from_type.clone(),
to_type: to_type.clone(),
};

match (record_batch_col.data_type(), expected_field.data_type()) {
(DataType::Utf8, DataType::List(item_type)) => {
cast_string_to_list::<i32>(&Arc::clone(record_batch_col), item_type)
.map_err(|e| Error::UnableToConvertRecordBatch { source: e })
.map_err(make_err)
}
(DataType::Utf8, DataType::LargeList(item_type)) => {
cast_string_to_large_list::<i32>(&Arc::clone(record_batch_col), item_type)
.map_err(|e| Error::UnableToConvertRecordBatch { source: e })
.map_err(make_err)
}
(DataType::Utf8, DataType::FixedSizeList(item_type, value_length)) => {
cast_string_to_fixed_size_list::<i32>(
&Arc::clone(record_batch_col),
item_type,
*value_length,
)
.map_err(|e| Error::UnableToConvertRecordBatch { source: e })
.map_err(make_err)
}
(DataType::Utf8, DataType::Struct(_)) => cast_string_to_struct::<i32>(
&Arc::clone(record_batch_col),
expected_field.clone(),
)
.map_err(|e| Error::UnableToConvertRecordBatch { source: e }),
.map_err(make_err),
(DataType::LargeUtf8, DataType::List(item_type)) => {
cast_string_to_list::<i64>(&Arc::clone(record_batch_col), item_type)
.map_err(|e| Error::UnableToConvertRecordBatch { source: e })
.map_err(make_err)
}
(DataType::LargeUtf8, DataType::LargeList(item_type)) => {
cast_string_to_large_list::<i64>(&Arc::clone(record_batch_col), item_type)
.map_err(|e| Error::UnableToConvertRecordBatch { source: e })
.map_err(make_err)
}
(DataType::LargeUtf8, DataType::FixedSizeList(item_type, value_length)) => {
cast_string_to_fixed_size_list::<i64>(
&Arc::clone(record_batch_col),
item_type,
*value_length,
)
.map_err(|e| Error::UnableToConvertRecordBatch { source: e })
.map_err(make_err)
}
(DataType::LargeUtf8, DataType::Struct(_)) => cast_string_to_struct::<i64>(
&Arc::clone(record_batch_col),
expected_field.clone(),
)
.map_err(|e| Error::UnableToConvertRecordBatch { source: e }),
.map_err(make_err),
(
DataType::Interval(IntervalUnit::MonthDayNano),
DataType::Interval(IntervalUnit::YearMonth),
) => cast_interval_monthdaynano_to_yearmonth(&Arc::clone(record_batch_col))
.map_err(|e| Error::UnableToConvertRecordBatch { source: e }),
.map_err(make_err),
(
DataType::Interval(IntervalUnit::MonthDayNano),
DataType::Interval(IntervalUnit::DayTime),
) => cast_interval_monthdaynano_to_daytime(&Arc::clone(record_batch_col))
.map_err(|e| Error::UnableToConvertRecordBatch { source: e }),
.map_err(make_err),
_ => cast(&Arc::clone(record_batch_col), expected_field.data_type())
.map_err(|e| Error::UnableToConvertRecordBatch { source: e }),
.map_err(make_err),
}
})
.collect::<Result<Vec<Arc<dyn Array>>>>()?;
.collect::<Result<Vec<Arc<dyn Array>>>>()
.inspect_err(|_| {
tracing::debug!(
actual_schema = ?actual_schema,
expected_schema = ?expected_schema,
"Cast error in try_cast_to"
);
tracing::trace!(record_batch = ?record_batch, "Record batch contents");
})?;

RecordBatch::try_new(expected_schema, cols)
.map_err(|e| Error::UnableToConvertRecordBatch { source: e })
RecordBatch::try_new(expected_schema.clone(), cols).map_err(|e| {
tracing::debug!(
actual_schema = ?actual_schema,
expected_schema = ?expected_schema,
"RecordBatch creation error in try_cast_to"
);
tracing::trace!(record_batch = ?record_batch, "Record batch contents");
Error::UnableToConvertRecordBatch { source: e }
})
}

#[cfg(test)]
Expand Down
Loading