Skip to content

Commit 0460497

Browse files
committed
fixed filtering of nulls and mixed types including AnyValues
1 parent bc4cec8 commit 0460497

4 files changed

Lines changed: 1734 additions & 110 deletions

File tree

rust/otap-dataflow/crates/query-engine/src/pipeline/assign.rs

Lines changed: 2 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,8 @@ use crate::pipeline::expr::{
6161
};
6262
use crate::pipeline::planner::{AttributesIdentifier, ColumnAccessor};
6363
use crate::pipeline::project::anyval::{
64-
fill_null_type_as_empty, is_any_value_data_type, project_any_value_columns,
65-
wrap_as_any_value_struct,
64+
attempt_coerce_value_column_from_any_value_struct_column, fill_null_type_as_empty,
65+
is_any_value_data_type, wrap_as_any_value_struct,
6666
};
6767
use crate::pipeline::project::{ProjectedSchemaColumn, Projection};
6868
use crate::pipeline::state::ExecutionState;
@@ -1660,32 +1660,6 @@ fn coerce_to_any_value_struct_column(values: ArrayRef) -> Result<ArrayRef> {
16601660
}
16611661
}
16621662

1663-
/// Attempt to coerce the values array (a struct array representing an AnyValue) into the concrete
1664-
/// values type. If the passed array contains multiple types, the original array is returned
1665-
/// because coercion was not successful.
1666-
fn attempt_coerce_value_column_from_any_value_struct_column(values: &ArrayRef) -> Result<ArrayRef> {
1667-
// build a temporary record batch containing the column to maybe partition by type
1668-
let rb = RecordBatch::try_new(
1669-
Arc::new(Schema::new(vec![Field::new(
1670-
"",
1671-
values.data_type().clone(),
1672-
true,
1673-
)])),
1674-
vec![Arc::clone(values)],
1675-
)?;
1676-
1677-
// attempt to partition the AnyValue column by type
1678-
let partitions = project_any_value_columns(&rb, &[0])?;
1679-
let result = if partitions.len() == 1 {
1680-
partitions[0].batch.column(0)
1681-
} else {
1682-
// just return the original array
1683-
values
1684-
};
1685-
1686-
Ok(Arc::clone(result))
1687-
}
1688-
16891663
/// Inserts the column into the record batch if the column does not exist, otherwise replaces the
16901664
/// existing column with the new one.
16911665
///

0 commit comments

Comments
 (0)