Skip to content

Commit ba06a25

Browse files
paleolimbotCopilot
andauthored
refactor(rust/sedona-spatial-join): Support wraparound rectangles in EvaluatedGeometryArray (apache#799)
Co-authored-by: Copilot <copilot@github.com>
1 parent f020db4 commit ba06a25

26 files changed

Lines changed: 1181 additions & 387 deletions

File tree

Cargo.lock

Lines changed: 6 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

c/sedona-libgpuspatial/src/lib.rs

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
// under the License.
1717

1818
use arrow_schema::DataType;
19-
use geo_types::Rect;
2019

2120
mod error;
2221
#[cfg(gpu_available)]
@@ -113,9 +112,10 @@ mod sys {
113112
/// Inserts a batch of bounding boxes into the index.
114113
/// Each rectangle is represented as a `Rect<f32>` with minimum and maximum x and y coordinates.
115114
/// This method accumulates these rectangles until `finish_building` is called to finalize the index.
116-
/// The method can be called multiple times to insert data in batches before finalizing.
117-
pub fn push_build(&mut self, rects: &[Rect<f32>]) -> Result<()> {
118-
// Re-interpreting Rect<f32> as flat f32 array (xmin, ymin, xmax, ymax)
115+
/// The method can be called multiple times to insert data in batches before finalizing. The values
116+
/// in rects are ordered (xmin, ymin, xmax, ymax).
117+
pub fn push_build(&mut self, rects: &[[f32; 4]]) -> Result<()> {
118+
// Re-interpreting rects as a flat f32 array (xmin, ymin, xmax, ymax)
119119
let raw_ptr = rects.as_ptr() as *const f32;
120120
self.inner.push_build(raw_ptr, rects.len() as u32)
121121
}
@@ -126,7 +126,8 @@ mod sys {
126126
}
127127

128128
/// Probes the spatial index with a batch of rectangles and returns pairs of matching indices from the build and probe sets.
129-
pub fn probe(&self, rects: &[Rect<f32>]) -> Result<(Vec<u32>, Vec<u32>)> {
129+
/// The values in rects are ordered (xmin, ymin, xmax, ymax).
130+
pub fn probe(&self, rects: &[[f32; 4]]) -> Result<(Vec<u32>, Vec<u32>)> {
130131
let raw_ptr = rects.as_ptr() as *const f32;
131132
self.inner.probe(raw_ptr, rects.len() as u32)
132133
}
@@ -203,13 +204,13 @@ mod sys {
203204
Err(GpuSpatialError::GpuNotAvailable)
204205
}
205206
pub fn clear(&mut self) {}
206-
pub fn push_build(&mut self, _r: &[Rect<f32>]) -> Result<()> {
207+
pub fn push_build(&mut self, _r: &[[f32; 4]]) -> Result<()> {
207208
Err(GpuSpatialError::GpuNotAvailable)
208209
}
209210
pub fn finish_building(&mut self) -> Result<GpuSpatialIndex> {
210211
Err(GpuSpatialError::GpuNotAvailable)
211212
}
212-
pub fn probe(&self, _r: &[Rect<f32>]) -> Result<(Vec<u32>, Vec<u32>)> {
213+
pub fn probe(&self, _r: &[[f32; 4]]) -> Result<(Vec<u32>, Vec<u32>)> {
213214
Err(GpuSpatialError::GpuNotAvailable)
214215
}
215216
}
@@ -268,13 +269,16 @@ mod tests {
268269
Some("POLYGON ((30 10, 40 40, 20 40, 10 20, 30 10))"),
269270
Some("POLYGON ((35 10, 45 45, 15 40, 10 20, 35 10), (20 30, 35 35, 30 20, 20 30))"),
270271
];
271-
let rects: Vec<Rect<f32>> = polygon_values
272+
let rects: Vec<_> = polygon_values
272273
.iter()
273274
.map(|w| {
274-
Polygon::try_from_wkt_str(w.unwrap())
275+
let rect = Polygon::try_from_wkt_str(w.unwrap())
275276
.unwrap()
276277
.bounding_rect()
277-
.unwrap()
278+
.unwrap();
279+
let min = rect.min();
280+
let max = rect.max();
281+
(min.x, min.y, max.x, max.y)
278282
})
279283
.collect();
280284

@@ -286,9 +290,14 @@ mod tests {
286290

287291
// 4. Probe (Index is immutable and safe)
288292
let point_values = &[Some("POINT (30 20)")];
289-
let points: Vec<Rect<f32>> = point_values
293+
let points: Vec<_> = point_values
290294
.iter()
291-
.map(|w| Point::try_from_wkt_str(w.unwrap()).unwrap().bounding_rect())
295+
.map(|w| {
296+
let rect = Point::try_from_wkt_str(w.unwrap()).unwrap().bounding_rect();
297+
let min = rect.min();
298+
let max = rect.max();
299+
(min.x, min.y, max.x, max.y)
300+
})
292301
.collect();
293302

294303
let (build_idx, probe_idx) = index.probe(&points).unwrap();

rust/sedona-functions/src/st_analyze_agg.rs

Lines changed: 21 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,17 @@ use arrow_schema::{DataType, Field, FieldRef};
2626
use datafusion_common::{
2727
cast::as_binary_array,
2828
error::{DataFusionError, Result},
29-
ScalarValue,
29+
exec_datafusion_err, ScalarValue,
3030
};
3131
use datafusion_expr::Volatility;
3232
use datafusion_expr::{Accumulator, ColumnarValue};
3333
use sedona_common::{sedona_internal_datafusion_err, sedona_internal_err};
3434
use sedona_expr::aggregate_udf::{SedonaAccumulatorRef, SedonaAggregateUDF};
3535
use sedona_expr::item_crs::ItemCrsSedonaAccumulator;
3636
use sedona_expr::{aggregate_udf::SedonaAccumulator, statistics::GeoStatistics};
37-
use sedona_geometry::analyze::GeometrySummary;
37+
use sedona_geometry::analyze::{analyze_wkb, GeometrySummary};
38+
use sedona_geometry::bounding_box::BoundingBox;
39+
use sedona_geometry::bounds::geo_traits_bounds_xy;
3840
use sedona_geometry::interval::IntervalTrait;
3941
use sedona_geometry::types::{GeometryTypeAndDimensions, GeometryTypeAndDimensionsSet};
4042
use sedona_schema::{datatypes::SedonaType, matchers::ArgMatcher};
@@ -77,12 +79,9 @@ impl SedonaAccumulator for STAnalyzeAgg {
7779
fn accumulator(
7880
&self,
7981
args: &[SedonaType],
80-
output_type: &SedonaType,
82+
_output_type: &SedonaType,
8183
) -> Result<Box<dyn Accumulator>> {
82-
Ok(Box::new(AnalyzeAccumulator::new(
83-
args[0].clone(),
84-
output_type.clone(),
85-
)))
84+
Ok(Box::new(AnalyzeAccumulator::new(args[0].clone())))
8685
}
8786

8887
fn state_fields(&self, _args: &[SedonaType]) -> Result<Vec<FieldRef>> {
@@ -223,38 +222,42 @@ impl STAnalyzeAgg {
223222
#[derive(Debug)]
224223
pub struct AnalyzeAccumulator {
225224
input_type: SedonaType,
226-
_output_type: SedonaType,
227225
stats: GeoStatistics,
228226
}
229227

230228
impl AnalyzeAccumulator {
231-
pub fn new(input_type: SedonaType, output_type: SedonaType) -> Self {
229+
pub fn new(input_type: SedonaType) -> Self {
232230
Self {
233231
input_type,
234-
_output_type: output_type,
235232
stats: GeoStatistics::empty(),
236233
}
237234
}
238235

239-
pub fn update_statistics(&mut self, geom: &Wkb) -> Result<()> {
240-
// Get geometry analysis information
241-
let summary = sedona_geometry::analyze::analyze_geometry(geom)
242-
.map_err(|e| DataFusionError::External(Box::new(e)))?;
236+
pub fn update_statistics_with_bbox(&mut self, geom: &Wkb, bbox: &BoundingBox) -> Result<()> {
237+
let summary = analyze_wkb(geom).map_err(|e| DataFusionError::External(Box::new(e)))?;
243238

244-
self.ingest_geometry_summary(&summary);
239+
self.ingest_geometry_summary(&summary, bbox);
240+
Ok(())
241+
}
245242

243+
fn update_statistics(&mut self, geom: &Wkb) -> Result<()> {
244+
let bbox =
245+
geo_traits_bounds_xy(geom).map_err(|e| exec_datafusion_err!("Bounding error: {e}"))?;
246+
let summary = analyze_wkb(geom).map_err(|e| exec_datafusion_err!("Analysis error: {e}"))?;
247+
248+
self.ingest_geometry_summary(&summary, &bbox);
246249
Ok(())
247250
}
248251

249-
pub fn ingest_geometry_summary(&mut self, summary: &GeometrySummary) {
252+
fn ingest_geometry_summary(&mut self, summary: &GeometrySummary, bbox: &BoundingBox) {
250253
// Start with a clone of the current stats
251254
let mut stats = self.stats.clone();
252255

253256
// Update each component of the statistics
254257
stats = self.update_basic_counts(stats, summary.size_bytes);
255258
stats = self.update_geometry_type_counts(stats, summary);
256259
stats = self.update_point_count(stats, summary.point_count);
257-
stats = self.update_envelope_info(stats, summary);
260+
stats = self.update_envelope_info(stats, bbox);
258261
stats = self.update_geometry_types(stats, summary.geometry_type);
259262

260263
// Assign the updated stats back to self.stats
@@ -300,14 +303,7 @@ impl AnalyzeAccumulator {
300303
}
301304

302305
// Update envelope dimensions and bounding box
303-
fn update_envelope_info(
304-
&self,
305-
stats: GeoStatistics,
306-
analysis: &GeometrySummary,
307-
) -> GeoStatistics {
308-
// The bbox is directly available on analysis, not wrapped in an Option
309-
let bbox = &analysis.bbox;
310-
306+
fn update_envelope_info(&self, stats: GeoStatistics, bbox: &BoundingBox) -> GeoStatistics {
311307
// Calculate envelope width and height from the bbox
312308
let envelope_width = if bbox.x().is_empty() {
313309
0.0

0 commit comments

Comments
 (0)