Skip to content

Commit fa8a410

Browse files
authored
Merge branch 'spiceai-50' into lukim/explain
2 parents 12c3739 + afcbd1c commit fa8a410

3 files changed

Lines changed: 66 additions & 16 deletions

File tree

datafusion-federation/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,11 @@ async-trait.workspace = true
2626
datafusion.workspace = true
2727
async-stream.workspace = true
2828
arrow-json.workspace = true
29+
tracing = "0.1"
2930

3031
[dev-dependencies]
3132
tokio.workspace = true
3233
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
33-
tracing = "0.1.40"
3434
insta = { version = "1.42.0", features = ["filters"] }
3535

3636
[[example]]

datafusion-federation/src/schema_cast/lists_cast.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ macro_rules! cast_string_to_list_array {
3030
None => list_builder.append_null(),
3131
Some(string_value) => {
3232
decoder.decode(string_value.as_bytes()).map_err(|e| {
33-
ArrowError::CastError(format!("Failed to decode value: {e}"))
33+
ArrowError::CastError(format!("Failed to decode value {string_value}: {e}"))
3434
})?;
3535

3636
if let Some(b) = decoder.flush().map_err(|e| {

datafusion-federation/src/schema_cast/record_convert.rs

Lines changed: 64 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,14 @@ pub enum Error {
2121
source: datafusion::arrow::error::ArrowError,
2222
},
2323

24+
UnableToCastColumn {
25+
source: datafusion::arrow::error::ArrowError,
26+
column_index: usize,
27+
column_name: String,
28+
from_type: DataType,
29+
to_type: DataType,
30+
},
31+
2432
UnexpectedNumberOfColumns {
2533
expected: usize,
2634
found: usize,
@@ -35,6 +43,18 @@ impl std::fmt::Display for Error {
3543
Error::UnableToConvertRecordBatch { source } => {
3644
write!(f, "Unable to convert record batch: {source}")
3745
}
46+
Error::UnableToCastColumn {
47+
source,
48+
column_index,
49+
column_name,
50+
from_type,
51+
to_type,
52+
} => {
53+
write!(
54+
f,
55+
"Unable to cast column {column_index} '{column_name}' from {from_type} to {to_type}: {source}"
56+
)
57+
}
3858
Error::UnexpectedNumberOfColumns { expected, found } => {
3959
write!(
4060
f,
@@ -52,6 +72,12 @@ pub fn try_cast_to(record_batch: RecordBatch, expected_schema: SchemaRef) -> Res
5272
let actual_schema = record_batch.schema();
5373

5474
if actual_schema.fields().len() != expected_schema.fields().len() {
75+
tracing::debug!(
76+
actual_schema = ?actual_schema,
77+
expected_schema = ?expected_schema,
78+
"Schema mismatch in try_cast_to"
79+
);
80+
tracing::trace!(record_batch = ?record_batch, "Record batch contents");
5581
return Err(Error::UnexpectedNumberOfColumns {
5682
expected: expected_schema.fields().len(),
5783
found: actual_schema.fields().len(),
@@ -64,68 +90,92 @@ pub fn try_cast_to(record_batch: RecordBatch, expected_schema: SchemaRef) -> Res
6490
.enumerate()
6591
.map(|(i, expected_field)| {
6692
let record_batch_col = record_batch.column(i);
93+
let from_type = record_batch_col.data_type().clone();
94+
let to_type = expected_field.data_type().clone();
95+
let make_err = |e| Error::UnableToCastColumn {
96+
source: e,
97+
column_index: i,
98+
column_name: expected_field.name().clone(),
99+
from_type: from_type.clone(),
100+
to_type: to_type.clone(),
101+
};
67102

68103
match (record_batch_col.data_type(), expected_field.data_type()) {
69104
(DataType::Utf8, DataType::List(item_type)) => {
70105
cast_string_to_list::<i32>(&Arc::clone(record_batch_col), item_type)
71-
.map_err(|e| Error::UnableToConvertRecordBatch { source: e })
106+
.map_err(make_err)
72107
}
73108
(DataType::Utf8, DataType::LargeList(item_type)) => {
74109
cast_string_to_large_list::<i32>(&Arc::clone(record_batch_col), item_type)
75-
.map_err(|e| Error::UnableToConvertRecordBatch { source: e })
110+
.map_err(make_err)
76111
}
77112
(DataType::Utf8, DataType::FixedSizeList(item_type, value_length)) => {
78113
cast_string_to_fixed_size_list::<i32>(
79114
&Arc::clone(record_batch_col),
80115
item_type,
81116
*value_length,
82117
)
83-
.map_err(|e| Error::UnableToConvertRecordBatch { source: e })
118+
.map_err(make_err)
84119
}
85120
(DataType::Utf8, DataType::Struct(_)) => cast_string_to_struct::<i32>(
86121
&Arc::clone(record_batch_col),
87122
expected_field.clone(),
88123
)
89-
.map_err(|e| Error::UnableToConvertRecordBatch { source: e }),
124+
.map_err(make_err),
90125
(DataType::LargeUtf8, DataType::List(item_type)) => {
91126
cast_string_to_list::<i64>(&Arc::clone(record_batch_col), item_type)
92-
.map_err(|e| Error::UnableToConvertRecordBatch { source: e })
127+
.map_err(make_err)
93128
}
94129
(DataType::LargeUtf8, DataType::LargeList(item_type)) => {
95130
cast_string_to_large_list::<i64>(&Arc::clone(record_batch_col), item_type)
96-
.map_err(|e| Error::UnableToConvertRecordBatch { source: e })
131+
.map_err(make_err)
97132
}
98133
(DataType::LargeUtf8, DataType::FixedSizeList(item_type, value_length)) => {
99134
cast_string_to_fixed_size_list::<i64>(
100135
&Arc::clone(record_batch_col),
101136
item_type,
102137
*value_length,
103138
)
104-
.map_err(|e| Error::UnableToConvertRecordBatch { source: e })
139+
.map_err(make_err)
105140
}
106141
(DataType::LargeUtf8, DataType::Struct(_)) => cast_string_to_struct::<i64>(
107142
&Arc::clone(record_batch_col),
108143
expected_field.clone(),
109144
)
110-
.map_err(|e| Error::UnableToConvertRecordBatch { source: e }),
145+
.map_err(make_err),
111146
(
112147
DataType::Interval(IntervalUnit::MonthDayNano),
113148
DataType::Interval(IntervalUnit::YearMonth),
114149
) => cast_interval_monthdaynano_to_yearmonth(&Arc::clone(record_batch_col))
115-
.map_err(|e| Error::UnableToConvertRecordBatch { source: e }),
150+
.map_err(make_err),
116151
(
117152
DataType::Interval(IntervalUnit::MonthDayNano),
118153
DataType::Interval(IntervalUnit::DayTime),
119154
) => cast_interval_monthdaynano_to_daytime(&Arc::clone(record_batch_col))
120-
.map_err(|e| Error::UnableToConvertRecordBatch { source: e }),
155+
.map_err(make_err),
121156
_ => cast(&Arc::clone(record_batch_col), expected_field.data_type())
122-
.map_err(|e| Error::UnableToConvertRecordBatch { source: e }),
157+
.map_err(make_err),
123158
}
124159
})
125-
.collect::<Result<Vec<Arc<dyn Array>>>>()?;
160+
.collect::<Result<Vec<Arc<dyn Array>>>>()
161+
.inspect_err(|_| {
162+
tracing::debug!(
163+
actual_schema = ?actual_schema,
164+
expected_schema = ?expected_schema,
165+
"Cast error in try_cast_to"
166+
);
167+
tracing::trace!(record_batch = ?record_batch, "Record batch contents");
168+
})?;
126169

127-
RecordBatch::try_new(expected_schema, cols)
128-
.map_err(|e| Error::UnableToConvertRecordBatch { source: e })
170+
RecordBatch::try_new(expected_schema.clone(), cols).map_err(|e| {
171+
tracing::debug!(
172+
actual_schema = ?actual_schema,
173+
expected_schema = ?expected_schema,
174+
"RecordBatch creation error in try_cast_to"
175+
);
176+
tracing::trace!(record_batch = ?record_batch, "Record batch contents");
177+
Error::UnableToConvertRecordBatch { source: e }
178+
})
129179
}
130180

131181
#[cfg(test)]

0 commit comments

Comments
 (0)