Skip to content

Commit 8b2ac11

Browse files
Jeadiesgrebnov
andauthored
Handle NULL sentinel for nullable partition expressions (e.g. 'bucket(N, col)') (spiceai#10880)
* handle NULL sentinel for nullable partition expressions (e.g. 'bucket(N, col)') * handle nullability in PartitionValue * Update 'PartitionValue' * Fix test compile errors after PartitionValue -> HashMap<String, Option<String>> change * linting * change test to handle additional partition * fix bucket count * fix: wrap PartitionValue test fixtures in Some() to match HashMap<String, Option<String>> type * Fix 'test_distributed_acceleration_order_by_limit_pushdown' * fix trunk? * missing colon * fix merge * fmt --------- Co-authored-by: Jeadie <jeadie@users.noreply.github.com> Co-authored-by: Sergei Grebnov <sergei.grebnov@gmail.com>
1 parent ee53043 commit 8b2ac11

9 files changed

Lines changed: 114 additions & 68 deletions

File tree

crates/runtime-cluster/src/executor_registry.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -497,7 +497,7 @@ pub(crate) fn get_partitions_from_store(
497497
};
498498

499499
// All required partitions (future: filter by query predicates)
500-
let required_partitions: Vec<HashMap<String, String>> = table_metadata
500+
let required_partitions: Vec<HashMap<String, Option<String>>> = table_metadata
501501
.partitions
502502
.iter()
503503
.map(|p| p.partition_value.clone())

crates/runtime-cluster/src/executor_selection.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ mod tests {
143143

144144
fn make_partition(key: &str, value: &str) -> PartitionValue {
145145
let mut p = HashMap::new();
146-
p.insert(key.to_string(), value.to_string());
146+
p.insert(key.to_string(), Some(value.to_string()));
147147
p
148148
}
149149

@@ -279,12 +279,12 @@ mod tests {
279279
#[test]
280280
fn test_select_executors_composite_partitions() {
281281
let mut part_a = HashMap::new();
282-
part_a.insert("region".to_string(), "us-east".to_string());
283-
part_a.insert("date".to_string(), "2024-01-01".to_string());
282+
part_a.insert("region".to_string(), Some("us-east".to_string()));
283+
part_a.insert("date".to_string(), Some("2024-01-01".to_string()));
284284

285285
let mut part_b = HashMap::new();
286-
part_b.insert("region".to_string(), "us-west".to_string());
287-
part_b.insert("date".to_string(), "2024-01-01".to_string());
286+
part_b.insert("region".to_string(), Some("us-west".to_string()));
287+
part_b.insert("date".to_string(), Some("2024-01-01".to_string()));
288288

289289
let required = vec![part_a.clone(), part_b.clone()];
290290

crates/runtime-cluster/src/metadata.rs

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use crate::context::PartitionExprResolver;
3939
/// {"date": "2024-01-01", "region": "us-west"}
4040
/// {"date": "2024-01-02", "region": "us-east"}
4141
/// ```
42-
pub type PartitionValue = HashMap<String, String>;
42+
pub type PartitionValue = HashMap<String, Option<String>>;
4343

4444
/// Metadata for a single partition of an accelerated table
4545
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
@@ -56,7 +56,7 @@ pub struct PartitionMetadata {
5656

5757
impl PartitionMetadata {
5858
#[must_use]
59-
pub fn new(partition_value: HashMap<String, String>) -> Self {
59+
pub fn new(partition_value: HashMap<String, Option<String>>) -> Self {
6060
Self {
6161
partition_value,
6262
assigned_executors: Vec::new(),
@@ -99,7 +99,10 @@ pub async fn partition_value_to_bytes(
9999
let mut expr: Option<Expr> = None;
100100
for (partition_expr, val) in p {
101101
let partition_by = resolver.try_parse_expr(tbl, &partition_expr).await?;
102-
let e = partition_by.eq(lit(val));
102+
let e = match val {
103+
None => partition_by.is_null(),
104+
Some(v) => partition_by.eq(lit(v)),
105+
};
103106
expr = match expr {
104107
Some(existing) => Some(existing.and(e)),
105108
None => Some(e),
@@ -179,17 +182,21 @@ impl TablePartitionMetadata {
179182
// key1 = val1 AND key2 = val2 AND ...
180183
let partition_predicate = partition_value
181184
.iter()
182-
.map(|(proj, lit)| {
183-
// Ensure lit is same type as proj
185+
.map(|(proj, val)| {
184186
let col = ctx.parse_sql_expr(proj, &df_schema)?;
187+
let Some(val) = val else {
188+
// NULL partition values need IS NULL, not = NULL
189+
// (SQL: `col = NULL` is always UNKNOWN, never TRUE)
190+
return Ok(col.is_null());
191+
};
185192
let col_type = col.get_type(&df_schema)?;
186-
let mut lit = ctx.parse_sql_expr(lit, &df_schema)?;
187-
if let Expr::Literal(ref s, None) = lit
193+
let mut lit_expr = ctx.parse_sql_expr(val, &df_schema)?;
194+
if let Expr::Literal(ref s, None) = lit_expr
188195
&& s.data_type() != col_type
189196
{
190-
lit = lit.cast_to(&col_type, &df_schema)?;
197+
lit_expr = lit_expr.cast_to(&col_type, &df_schema)?;
191198
}
192-
Ok(col.eq(lit))
199+
Ok(col.eq(lit_expr))
193200
})
194201
.collect::<Result<Vec<Expr>, DataFusionError>>()?
195202
.into_iter()

crates/runtime-cluster/src/service.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -455,12 +455,12 @@ impl PartitionService {
455455
}
456456
};
457457

458-
let existing: HashSet<Vec<(String, String)>> = existing_partitions
458+
let existing: HashSet<Vec<(String, Option<String>)>> = existing_partitions
459459
.iter()
460460
.map(|p| sorted_kv(&p.partition_value))
461461
.collect();
462462

463-
let source_set: HashSet<Vec<(String, String)>> =
463+
let source_set: HashSet<Vec<(String, Option<String>)>> =
464464
source_partitions.iter().map(sorted_kv).collect();
465465

466466
let new: Vec<PartitionValue> = source_partitions
@@ -585,7 +585,7 @@ fn resolved_equality(a: &TableReference, b: &TableReference) -> bool {
585585
}
586586

587587
/// Sort a `PartitionValue` into a deterministic `Vec<(k, v)>` for equality comparisons.
588-
fn sorted_kv(p: &PartitionValue) -> Vec<(String, String)> {
588+
fn sorted_kv(p: &PartitionValue) -> Vec<(String, Option<String>)> {
589589
let mut v: Vec<_> = p.clone().into_iter().collect();
590590
v.sort();
591591
v
@@ -1176,7 +1176,7 @@ mod tests {
11761176
}
11771177

11781178
fn pv(key: &str, val: &str) -> PartitionValue {
1179-
HashMap::from([(key.to_string(), val.to_string())])
1179+
HashMap::from([(key.to_string(), Some(val.to_string()))])
11801180
}
11811181

11821182
async fn setup_table(store: &PartitionStore, table: &str, partitions: Vec<PartitionMetadata>) {

crates/runtime-cluster/src/store.rs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ impl PartitionStore {
225225
pub async fn set_unassigned_partitions(
226226
&self,
227227
table: &TableReference,
228-
partition_values: Vec<HashMap<String, String>>,
228+
partition_values: Vec<HashMap<String, Option<String>>>,
229229
partition_expressions: Vec<String>,
230230
) -> Result<()> {
231231
let key = normalized_table_name(table);
@@ -664,7 +664,7 @@ mod tests {
664664
}
665665

666666
fn partition_value(key: &str, val: &str) -> PartitionValue {
667-
HashMap::from([(key.to_string(), val.to_string())])
667+
HashMap::from([(key.to_string(), Some(val.to_string()))])
668668
}
669669

670670
#[tokio::test]
@@ -702,7 +702,7 @@ mod tests {
702702
metadata
703703
.partitions
704704
.iter()
705-
.find(|p| p.partition_value.get("bucket(3, id)") == Some(&val.to_string()))
705+
.find(|p| p.partition_value.get("bucket(3, id)") == Some(&Some(val.to_string())))
706706
.expect("partition not found")
707707
};
708708

@@ -786,14 +786,14 @@ mod tests {
786786
let east = metadata
787787
.partitions
788788
.iter()
789-
.find(|p| p.partition_value.get("region") == Some(&"us-east".to_string()))
789+
.find(|p| p.partition_value.get("region") == Some(&Some("us-east".to_string())))
790790
.expect("us-east");
791791
assert!(east.is_assigned_to("executor-1"));
792792

793793
let west = metadata
794794
.partitions
795795
.iter()
796-
.find(|p| p.partition_value.get("region") == Some(&"us-west".to_string()))
796+
.find(|p| p.partition_value.get("region") == Some(&Some("us-west".to_string())))
797797
.expect("us-west");
798798
assert!(west.is_assigned_to("executor-2"));
799799
}
@@ -830,12 +830,12 @@ mod tests {
830830
.await
831831
.expect("should initialize");
832832

833-
let pv = HashMap::from([("region".to_string(), "us-east-1".to_string())]);
833+
let pv = HashMap::from([("region".to_string(), Some("us-east-1".to_string()))]);
834834
pm.set_unassigned_partitions(&source, vec![pv], vec![])
835835
.await
836836
.expect("should set partitions");
837837
let partition_value: PartitionValue =
838-
HashMap::from([("region".to_string(), "us-east-1".to_string())]);
838+
HashMap::from([("region".to_string(), Some("us-east-1".to_string()))]);
839839
pm.assign_partition(&source, &partition_value, "executor-1")
840840
.await
841841
.expect("should assign");
@@ -890,7 +890,7 @@ mod tests {
890890
.await
891891
.expect("should initialize target");
892892

893-
let pv = HashMap::from([("region".to_string(), "eu-west-1".to_string())]);
893+
let pv = HashMap::from([("region".to_string(), Some("eu-west-1".to_string()))]);
894894
pm.set_unassigned_partitions(&source, vec![pv], vec![])
895895
.await
896896
.expect("should set partitions");
@@ -959,7 +959,7 @@ mod tests {
959959
.await
960960
.expect("should initialize");
961961

962-
let pv = HashMap::from([("org_id".to_string(), "test_org_name".to_string())]);
962+
let pv = HashMap::from([("org_id".to_string(), Some("test_org_name".to_string()))]);
963963
pm.set_unassigned_partitions(&bare, vec![pv], vec![])
964964
.await
965965
.expect("should set partitions");
@@ -973,7 +973,7 @@ mod tests {
973973
assert_eq!(meta_via_full.partitions.len(), 1);
974974

975975
let partition_value: PartitionValue =
976-
HashMap::from([("org_id".to_string(), "test_org_name".to_string())]);
976+
HashMap::from([("org_id".to_string(), Some("test_org_name".to_string()))]);
977977
pm.assign_partition(&full, &partition_value, "executor-1")
978978
.await
979979
.expect("should assign via fully qualified ref");
@@ -999,7 +999,7 @@ mod tests {
999999
acc.initialize_metadata(&table, vec!["region".to_string()])
10001000
.await
10011001
.expect("init");
1002-
let pv = HashMap::from([("region".to_string(), "us-east-1".to_string())]);
1002+
let pv = HashMap::from([("region".to_string(), Some("us-east-1".to_string()))]);
10031003
acc.set_unassigned_partitions(&table, vec![pv.clone()], vec![])
10041004
.await
10051005
.expect("set");
@@ -1018,8 +1018,8 @@ mod tests {
10181018
pm.initialize_metadata(&table, vec!["region".to_string()])
10191019
.await
10201020
.expect("init");
1021-
let values: Vec<HashMap<String, String>> = (0..50)
1022-
.map(|i| HashMap::from([("region".to_string(), format!("r-{i}"))]))
1021+
let values: Vec<HashMap<String, Option<String>>> = (0..50)
1022+
.map(|i| HashMap::from([("region".to_string(), Some(format!("r-{i}")))]))
10231023
.collect();
10241024
pm.set_unassigned_partitions(&table, values.clone(), vec![])
10251025
.await

crates/runtime-cluster/src/write_through.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -454,7 +454,14 @@ async fn route_batch_and_assign_unseen(
454454
let partition_value: PartitionValue = partition_expr_keys
455455
.iter()
456456
.zip(scalar_values.iter())
457-
.map(|(expr_key, scalar)| (expr_key.clone(), scalar_to_sql_literal(scalar)))
457+
.map(|(expr_key, scalar)| {
458+
let val = if scalar.is_null() {
459+
None
460+
} else {
461+
Some(scalar_to_sql_literal(scalar))
462+
};
463+
(expr_key.clone(), val)
464+
})
458465
.collect();
459466
Some((scalar_values, partition_value, sub_batch))
460467
})
@@ -510,7 +517,13 @@ async fn route_batch_and_assign_unseen(
510517
let new_pred = partition_phys_exprs
511518
.iter()
512519
.zip(scalar_values.iter())
513-
.map(|((logical_expr, _), scalar)| logical_expr.clone().eq(lit(scalar.clone())))
520+
.map(|((logical_expr, _), scalar)| {
521+
if scalar.is_null() {
522+
logical_expr.clone().is_null()
523+
} else {
524+
logical_expr.clone().eq(lit(scalar.clone()))
525+
}
526+
})
514527
.reduce(Expr::and);
515528

516529
if let Some(new_pred) = new_pred {

crates/runtime-datafusion/src/analyzer_rule/partitioned_table_scan_rewrite.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ use parking_lot::RwLock;
5050
/// {"date": "2024-01-01", "region": "us-west"}
5151
/// {"date": "2024-01-02", "region": "us-east"}
5252
/// ```
53-
pub type PartitionValue = HashMap<String, String>;
53+
pub type PartitionValue = HashMap<String, Option<String>>;
5454

5555
/// Define how to get partitions for a given table, and how they are partitioned.
5656
pub trait TablePartitionProvider: Send + Sync + Debug {
@@ -378,16 +378,19 @@ fn partition_value_to_expr(
378378
) -> Result<Option<Expr>, DataFusionError> {
379379
let mut expr: Option<Expr> = None;
380380
for (partition_expr_str, val) in pv {
381-
let new_expr = state
381+
let col_expr = state
382382
.upgrade()
383383
.ok_or_else(|| {
384384
DataFusionError::Plan(
385385
"SessionState has been dropped, cannot parse partition expression".to_string(),
386386
)
387387
})?
388388
.read()
389-
.create_logical_expr(partition_expr_str, df_schema)?
390-
.eq(lit(val.clone()));
389+
.create_logical_expr(partition_expr_str, df_schema)?;
390+
let new_expr = match val {
391+
None => col_expr.is_null(),
392+
Some(v) => col_expr.eq(lit(v.clone())),
393+
};
391394
expr = match expr {
392395
Some(existing) => Some(existing.and(new_expr)),
393396
None => Some(new_expr),
@@ -426,14 +429,14 @@ mod tests {
426429
p1,
427430
vec![HashMap::from([(
428431
"partition_id".to_string(),
429-
"0".to_string(),
432+
Some("0".to_string()),
430433
)])],
431434
),
432435
(
433436
p2,
434437
vec![HashMap::from([(
435438
"partition_id".to_string(),
436-
"1".to_string(),
439+
Some("1".to_string()),
437440
)])],
438441
),
439442
]

0 commit comments

Comments
 (0)