Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3,386 changes: 1,694 additions & 1,692 deletions .schema/spicepod.schema.json

Large diffs are not rendered by default.

184 changes: 92 additions & 92 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ homepage = "https://spice.ai"
license = "Apache-2.0"
repository = "https://github.com/spiceai/spiceai"
rust-version = "1.94.1"
version = "2.0.0-rc.3"
version = "2.0.0-unstable"

[workspace.dependencies]
adbc_core = "0.23"
Expand Down
80 changes: 53 additions & 27 deletions crates/data_components/src/turso.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2157,24 +2157,50 @@ fn scalar_value_to_turso(
/// ```
/// into:
/// ```sql
/// CAST(expr AS REAL) >= CAST(low AS REAL)
/// AND CAST(expr AS REAL) <= CAST(high AS REAL)
/// ROUND(expr, 10) >= ROUND(low, 10)
/// AND ROUND(expr, 10) <= ROUND(high, 10)
/// ```
/// (with the obvious inversion for `NOT BETWEEN`).
///
/// `ROUND(..., 10)` normalizes both sides to 10 decimal places, which
/// eliminates float arithmetic precision issues. For example, float64
/// `0.06 + 0.01 = 0.06999...` is less than the stored `0.07`, but
/// `ROUND(0.06 + 0.01, 10) = 0.07` matches `ROUND(0.07, 10)` exactly.
///
/// Only expressions where *both* bounds appear numeric (literal numbers,
/// unary-minus numbers, or arithmetic on numbers) are rewritten.
struct TursoBetweenVisitor;

/// Number of decimal places used by `ROUND()` to normalize float values.
const TURSO_ROUND_DECIMAL_PLACES: u8 = 10;

impl TursoBetweenVisitor {
/// Wrap `expr` in `CAST(expr AS REAL)`.
fn cast_to_real(expr: sqlast::Expr) -> sqlast::Expr {
sqlast::Expr::Cast {
kind: sqlast::CastKind::Cast,
expr: Box::new(expr),
data_type: sqlast::DataType::Real,
format: None,
}
/// Wrap `expr` in `ROUND(expr, N)`.
fn round_expr(expr: sqlast::Expr) -> sqlast::Expr {
sqlast::Expr::Function(sqlast::Function {
name: sqlast::ObjectName(vec![sqlast::ObjectNamePart::Identifier(
sqlast::Ident::new("ROUND"),
)]),
args: sqlast::FunctionArguments::List(sqlast::FunctionArgumentList {
duplicate_treatment: None,
args: vec![
sqlast::FunctionArg::Unnamed(sqlast::FunctionArgExpr::Expr(expr)),
sqlast::FunctionArg::Unnamed(sqlast::FunctionArgExpr::Expr(
sqlast::Expr::value(sqlast::Value::Number(
TURSO_ROUND_DECIMAL_PLACES.to_string(),
false,
)),
)),
],
clauses: vec![],
}),
filter: None,
null_treatment: None,
over: None,
within_group: vec![],
parameters: sqlast::FunctionArguments::None,
uses_odbc_syntax: false,
})
}

/// Returns `true` if the expression looks like a numeric value or
Expand Down Expand Up @@ -2209,39 +2235,39 @@ impl VisitorMut for TursoBetweenVisitor {
&& Self::is_numeric_expr(high)
{
let negated = *negated;
let cast_expr_low = Self::cast_to_real(*between_expr.clone());
let cast_expr_high = Self::cast_to_real(*between_expr.clone());
let cast_low = Self::cast_to_real(*low.clone());
let cast_high = Self::cast_to_real(*high.clone());
let round_expr_low = Self::round_expr(*between_expr.clone());
let round_expr_high = Self::round_expr(*between_expr.clone());
let round_low = Self::round_expr(*low.clone());
let round_high = Self::round_expr(*high.clone());

if negated {
// NOT BETWEEN → expr < low OR expr > high
*expr = sqlast::Expr::BinaryOp {
left: Box::new(sqlast::Expr::BinaryOp {
left: Box::new(cast_expr_low),
left: Box::new(round_expr_low),
op: sqlast::BinaryOperator::Lt,
right: Box::new(cast_low),
right: Box::new(round_low),
}),
op: sqlast::BinaryOperator::Or,
right: Box::new(sqlast::Expr::BinaryOp {
left: Box::new(cast_expr_high),
left: Box::new(round_expr_high),
op: sqlast::BinaryOperator::Gt,
right: Box::new(cast_high),
right: Box::new(round_high),
}),
};
} else {
// BETWEEN → expr >= low AND expr <= high
*expr = sqlast::Expr::BinaryOp {
left: Box::new(sqlast::Expr::BinaryOp {
left: Box::new(cast_expr_low),
left: Box::new(round_expr_low),
op: sqlast::BinaryOperator::GtEq,
right: Box::new(cast_low),
right: Box::new(round_low),
}),
op: sqlast::BinaryOperator::And,
right: Box::new(sqlast::Expr::BinaryOp {
left: Box::new(cast_expr_high),
left: Box::new(round_expr_high),
op: sqlast::BinaryOperator::LtEq,
right: Box::new(cast_high),
right: Box::new(round_high),
}),
};
}
Expand Down Expand Up @@ -2274,12 +2300,12 @@ mod tests {
"BETWEEN should be rewritten, got: {result}"
);
assert!(
result.contains("CAST(x AS REAL) >= CAST(0.05 AS REAL)"),
"should cast to REAL: {result}"
result.contains("ROUND(x, 10) >= ROUND(0.05, 10)"),
"should use ROUND: {result}"
);
assert!(
result.contains("CAST(x AS REAL) <= CAST(0.07 AS REAL)"),
"should cast to REAL: {result}"
result.contains("ROUND(x, 10) <= ROUND(0.07, 10)"),
"should use ROUND: {result}"
);
}

Expand All @@ -2291,7 +2317,7 @@ mod tests {
!result.contains("BETWEEN"),
"BETWEEN with arithmetic bounds should be rewritten, got: {result}"
);
assert!(result.contains("CAST"), "should contain CAST: {result}");
assert!(result.contains("ROUND"), "should contain ROUND: {result}");
}

#[test]
Expand Down
11 changes: 11 additions & 0 deletions crates/spicepod/src/acceleration/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,12 +307,23 @@ pub struct Acceleration {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub metrics: Option<Metrics>,

/// Partition expressions used to physically partition accelerated data.
///
/// Each item accepts either:
/// - a plain expression string, for example `"YEAR(created_at)"` or
/// `"bucket(100, user_id)"`; or
/// - a single-entry mapping of a partition name to an expression, for
/// example `{ year: "YEAR(created_at)" }`.
#[serde(
default,
skip_serializing_if = "Vec::is_empty",
serialize_with = "serialize_partition_by",
deserialize_with = "deserialize_partition_by"
)]
#[cfg_attr(
feature = "schemars",
schemars(with = "Vec<crate::partitioning::PartitionedBySchema>")
)]
pub partition_by: Vec<PartitionedBy>,

/// Enables snapshots for this dataset, requires the top-level config `snapshots` to be defined.
Expand Down
157 changes: 145 additions & 12 deletions crates/spicepod/src/partitioning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,32 +26,63 @@ pub struct PartitionedBy {
pub expression: String,
}

#[cfg(feature = "schemars")]
#[derive(JsonSchema)]
#[serde(untagged)]
pub enum PartitionedBySchema {
Expression(String),
#[schemars(extend("minProperties" = 1, "maxProperties" = 1))]
Named(std::collections::HashMap<String, String>),
}

pub fn deserialize_partition_by<'de, D>(deserializer: D) -> Result<Vec<PartitionedBy>, D::Error>
where
D: Deserializer<'de>,
{
use serde::de::Error;

let values = Vec::<serde_json::Value>::deserialize(deserializer)?;

let mut result = Vec::new();
let mut result = Vec::with_capacity(values.len());

for value in values {
for (idx, value) in values.into_iter().enumerate() {
match value {
serde_json::Value::String(expression) => {
let name = format!("expr{i}", i = result.len());
let partitioned_by = PartitionedBy { name, expression };
result.push(partitioned_by);
result.push(PartitionedBy { name, expression });
}
serde_json::Value::Object(map) => {
// case where {"year": "YEAR(created_at)"}
for (name, v) in map {
if let serde_json::Value::String(expression) = v {
let partitioned_by = PartitionedBy { name, expression };
result.push(partitioned_by);
break; // take first string and ignore others
}
// Accepts only a single-entry `{name: expression_string}` mapping.
if map.len() != 1 {
return Err(D::Error::custom(format!(
"partition_by[{idx}]: named partition must be a single-entry mapping of `name: expression_string`, found {} entries",
map.len()
)));
}
// Safe: len == 1.
let (name, v) = map.into_iter().next().ok_or_else(|| {
D::Error::custom(format!("partition_by[{idx}]: unexpected empty mapping"))
})?;
let serde_json::Value::String(expression) = v else {
return Err(D::Error::custom(format!(
"partition_by[{idx}]: named partition value for `{name}` must be a string expression"
)));
};
result.push(PartitionedBy { name, expression });
}
other => {
let kind = match other {
serde_json::Value::Null => "null",
serde_json::Value::Bool(_) => "bool",
serde_json::Value::Number(_) => "number",
serde_json::Value::Array(_) => "array",
// String/Object handled above.
_ => "unsupported value",
};
return Err(D::Error::custom(format!(
"partition_by[{idx}]: expected a string expression or a single-entry `{{name: expression}}` mapping, found {kind}"
)));
}
_ => {}
}
}

Expand Down Expand Up @@ -138,4 +169,106 @@ partition_by:
assert_eq!(result.partition_by[2].expression, "DAY(created_at)");
Ok(())
}

#[test]
fn deserialize_partition_by_rejects_multi_entry_map() {
let yaml = r#"
partition_by:
- year: "YEAR(created_at)"
month: "MONTH(created_at)"
"#;
let err = from_str::<Test>(yaml).expect_err("multi-entry mapping must be rejected");
let msg = err.to_string();
assert!(
msg.contains("single-entry mapping"),
"unexpected error: {msg}"
);
}

#[test]
fn deserialize_partition_by_rejects_non_string_value() {
let yaml = r"
partition_by:
- year: 2024
";
let err = from_str::<Test>(yaml).expect_err("non-string expression must be rejected");
let msg = err.to_string();
assert!(
msg.contains("must be a string expression"),
"unexpected error: {msg}"
);
}

#[test]
fn deserialize_partition_by_rejects_scalar_items() {
let yaml = r"
partition_by:
- 42
";
let err = from_str::<Test>(yaml).expect_err("non-string, non-object item must be rejected");
let msg = err.to_string();
assert!(
msg.contains("expected a string expression"),
"unexpected error: {msg}"
);
}

/// Guards against regressions in the generated JSON schema for
/// `PartitionedBySchema`: it must describe both a plain expression string
/// and a single-entry `{name: expr}` object (with `minProperties = 1` /
/// `maxProperties = 1`).
#[cfg(feature = "schemars")]
#[test]
fn partition_by_schema_shapes() {
use schemars::schema_for;

let schema = schema_for!(PartitionedBySchema);
let value = serde_json::to_value(&schema).expect("serialize schema");

let any_of = value
.get("anyOf")
.and_then(|v| v.as_array())
.expect("PartitionedBySchema must generate an anyOf of the accepted shapes");
assert_eq!(
any_of.len(),
2,
"PartitionedBySchema should have two accepted shapes (string | single-entry map)"
);

// Shape 1: plain expression string.
assert!(
any_of
.iter()
.any(|v| v.get("type").and_then(|t| t.as_str()) == Some("string")),
"PartitionedBySchema must accept a plain string expression"
);

// Shape 2: single-entry object mapping name -> expression.
let named = any_of
.iter()
.find(|v| v.get("type").and_then(|t| t.as_str()) == Some("object"))
.expect("PartitionedBySchema must accept an object shape");
assert_eq!(
named
.get("minProperties")
.and_then(serde_json::Value::as_u64),
Some(1),
"named partition mapping must require at least one entry"
);
assert_eq!(
named
.get("maxProperties")
.and_then(serde_json::Value::as_u64),
Some(1),
"named partition mapping must allow at most one entry"
);
assert_eq!(
named
.get("additionalProperties")
.and_then(|v| v.get("type"))
.and_then(|t| t.as_str()),
Some("string"),
"named partition mapping values must be strings"
);
}
}
14 changes: 13 additions & 1 deletion crates/spicepod/src/vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use serde::{Deserialize, Serialize};

use crate::{
param::Params,
partitioning::{PartitionedBy, deserialize_partition_by},
partitioning::{PartitionedBy, deserialize_partition_by, serialize_partition_by},
};

#[derive(Default, Debug, Clone, Serialize, Deserialize, PartialEq)]
Expand All @@ -33,11 +33,23 @@ pub struct VectorStore {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub engine: Option<String>,

/// Partition expressions used to organize vector data.
///
/// Each item accepts either:
/// - a plain expression string, for example `"YEAR(created_at)"` or
/// `"bucket(100, user_id)"`; or
/// - a single-entry mapping of a partition name to an expression, for
/// example `{ year: "YEAR(created_at)" }`.
#[serde(
default,
skip_serializing_if = "Vec::is_empty",
serialize_with = "serialize_partition_by",
deserialize_with = "deserialize_partition_by"
)]
#[cfg_attr(
feature = "schemars",
schemars(with = "Vec<crate::partitioning::PartitionedBySchema>")
)]
pub partition_by: Vec<PartitionedBy>,

#[serde(default, skip_serializing_if = "Option::is_none")]
Expand Down
Loading
Loading