Skip to content

Executor #419

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 49 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
50623b5
Proof of concept
koerberm Nov 23, 2021
f1cda0f
Merge branch 'master' into executor
koerberm Dec 1, 2021
ad65b85
Adapted to new executor api
koerberm Dec 10, 2021
e03b9fd
Merge branch 'master' into executor
koerberm Dec 10, 2021
b622945
First draft of executor integration
koerberm Dec 10, 2021
f85fedc
Merge branch 'master' into executor
koerberm Jan 10, 2022
ec09bc3
removed clap dependency;
koerberm Jan 10, 2022
1ebc788
Clarified error message when returning a stream proxied by the executor;
koerberm Jan 10, 2022
30a6c58
Made executor API available in PRO only
koerberm Jan 12, 2022
7cd7fd0
Removed executor/task manager from non-pro version;
koerberm Jan 14, 2022
d5d159b
Fixed pro import;
koerberm Jan 14, 2022
7efd9fe
Introduced ReceiverStream<T> to the replay channel;
koerberm Jan 14, 2022
e887daa
Added test for ReceiverStream<T>
koerberm Jan 14, 2022
7894ba5
Update operators/src/pro/executor/mod.rs
koerberm Jan 17, 2022
714d3d5
reverted commit suggestion;
koerberm Jan 17, 2022
11819ad
Made unreachable error explicit;
koerberm Jan 17, 2022
6be3632
Completed test with dropped consumer;
koerberm Jan 17, 2022
eb1e3e9
Clippy & Typo;
koerberm Jan 19, 2022
b3cce36
Added docs
koerberm Jan 19, 2022
f9b207e
Made replay channel queue size configurable;
koerberm Jan 19, 2022
c23bad5
Merge remote-tracking branch 'origin/master' into executor
koerberm Jan 26, 2022
937b37d
tokio time feature now in regular dependencies;
koerberm Jan 26, 2022
1bc31f9
Merge branch 'spatial-resolution-fix' into executor
koerberm Jan 26, 2022
fe648d3
Draft for adding the QueryRectangle to the executor key;
koerberm Jan 26, 2022
3c89f70
Revert "Draft for adding the QueryRectangle to the executor key;"
koerberm Jan 28, 2022
377a1ad
Second draft for adding the QueryRectangle to the Executor Key;
koerberm Jan 28, 2022
0d4a097
Merge branch 'master' into executor
koerberm Jan 28, 2022
d45ebaa
Executor: First draft for advanced result sharing (recovered previous…
koerberm Feb 9, 2022
f34062b
Merge branch 'master' into executor
koerberm Feb 9, 2022
a6dc6c6
Added ExecutorDescriptions for Vector data;
koerberm Feb 9, 2022
23e1c4d
Fixed SpatialPartition2D::contains: Ensuring that equal instances con…
koerberm Feb 14, 2022
82d40b2
Implementer Intersects<BoundingBox2D> for all GeometryRef types;
koerberm Feb 14, 2022
851cea0
Implemented Intersects<BoundingBox2D> and Intersects<TimeInterval> fo…
koerberm Feb 14, 2022
d24134f
Added Debug trait to ExecutorTaskDescription, refined logging and docs;
koerberm Feb 14, 2022
f57d65d
New trait 'OneshotQueryProcessor' to turn a QueryProcessor into a 'st…
koerberm Feb 14, 2022
115a267
Added new png-rendering which directly consumes a stream of tiles;
koerberm Feb 14, 2022
016b0d5
Implemented ExecutorTaskDescriptions for Plots, Vector- and Raster st…
koerberm Feb 14, 2022
b4a6c0d
Added executors for all data types to the TaskManager;
koerberm Feb 14, 2022
865840e
Implemented Pro-Handlers for wms and wfs that utilize the correspondi…
koerberm Feb 14, 2022
0ecee9a
Fixed intersects implementations of primitives and added tests;
koerberm Feb 14, 2022
9534040
Added tests for OneshotQueryProcessor implementations;
koerberm Feb 14, 2022
4a40f0b
Added tests for executor task descriptions
koerberm Feb 14, 2022
c28f6f3
clippy;
koerberm Feb 14, 2022
04d1607
Merge branch 'master' into executor
koerberm Feb 14, 2022
6efff61
Snafu
koerberm Feb 14, 2022
448c9c2
Renamed can_join -> is_contained_in in ExecutorTaskDescription
koerberm Feb 28, 2022
693a17d
Introduced constant for the Executor task queue size;
koerberm Feb 28, 2022
6bd7fad
Clarified that a submitted future always returns a result;
koerberm Feb 28, 2022
4b83dea
Merge branch 'master' into executor
koerberm Feb 28, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Settings-default.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ origin_coordinate_y = 0.0
tile_shape_pixels_x = 512
tile_shape_pixels_y = 512

[executor]
queue_size = 10

[query_context]
chunk_byte_size = 1048576 # TODO: find reasonable default

Expand Down
18 changes: 17 additions & 1 deletion datatypes/src/collections/feature_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::ops::{Bound, RangeBounds};
use std::rc::Rc;
use std::sync::Arc;

use crate::primitives::Coordinate2D;
use crate::primitives::{BoundingBox2D, Coordinate2D};
use crate::primitives::{
CategoryDataRef, FeatureData, FeatureDataRef, FeatureDataType, FeatureDataValue, FloatDataRef,
Geometry, IntDataRef, TextDataRef, TimeInterval,
Expand All @@ -39,6 +39,7 @@ use crate::{
collections::{FeatureCollectionError, IntoGeometryOptionsIterator},
operations::reproject::CoordinateProjection,
};
use geo::intersects::Intersects;
use std::iter::FromIterator;

use super::{geo_feature_collection::ReplaceRawArrayCoords, GeometryCollection};
Expand Down Expand Up @@ -811,6 +812,21 @@ impl<'a, GeometryRef> FeatureCollectionRow<'a, GeometryRef> {
}
}

impl<'a, GR> Intersects<BoundingBox2D> for FeatureCollectionRow<'a, GR>
where
GR: Intersects<BoundingBox2D>,
{
fn intersects(&self, rhs: &BoundingBox2D) -> bool {
self.geometry.intersects(rhs)
}
}

impl<'a, GR> Intersects<TimeInterval> for FeatureCollectionRow<'a, GR> {
fn intersects(&self, rhs: &TimeInterval) -> bool {
self.time_interval.intersects(rhs)
}
}

pub struct FeatureCollectionIterator<'a, GeometryIter> {
geometries: GeometryIter,
time_intervals: std::slice::Iter<'a, TimeInterval>,
Expand Down
11 changes: 11 additions & 0 deletions datatypes/src/primitives/bounding_box.rs
Original file line number Diff line number Diff line change
Expand Up @@ -688,6 +688,17 @@ mod tests {
assert!(bbox.contains_bbox(&bbox_in));
}

#[test]
fn bounding_box_contains_bbox_equal() {
let ll = Coordinate2D::new(1.0, 1.0);
let ur = Coordinate2D::new(4.0, 4.0);
let bbox = BoundingBox2D::new(ll, ur).unwrap();

let bbox_in = BoundingBox2D::new(ll, ur).unwrap();

assert!(bbox.contains_bbox(&bbox_in));
}

#[test]
fn bounding_box_contains_bbox_overlap() {
let ll = Coordinate2D::new(1.0, 1.0);
Expand Down
43 changes: 41 additions & 2 deletions datatypes/src/primitives/multi_line_string.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ use std::convert::TryFrom;
use arrow::array::{ArrayBuilder, BooleanArray};
use arrow::error::ArrowError;
use float_cmp::{ApproxEq, F64Margin};
use geo::algorithm::intersects::Intersects;
use geo::intersects::Intersects;
use serde::{Deserialize, Serialize};
use snafu::ensure;

use crate::collections::VectorDataType;
use crate::error::Error;
use crate::primitives::{
error, BoundingBox2D, GeometryRef, MultiPoint, PrimitivesError, TypedGeometry,
error, BoundingBox2D, GeometryRef, MultiPoint, PrimitivesError, SpatialBounded, TypedGeometry,
};
use crate::primitives::{Coordinate2D, Geometry};
use crate::util::arrow::{downcast_array, ArrowTyped};
Expand Down Expand Up @@ -280,6 +280,19 @@ impl<'g> MultiLineStringAccess for MultiLineStringRef<'g> {
&self.point_coordinates
}
}
impl<'g> SpatialBounded for MultiLineStringRef<'g> {
fn spatial_bounds(&self) -> BoundingBox2D {
let coords = self.point_coordinates.iter().flat_map(|&x| x.iter());
BoundingBox2D::from_coord_ref_iter(coords)
.expect("there must be at least one coordinate in a multilinestring")
}
}

impl<'g> Intersects<BoundingBox2D> for MultiLineStringRef<'g> {
fn intersects(&self, rhs: &BoundingBox2D) -> bool {
self.spatial_bounds().intersects_bbox(rhs)
}
}

impl<'g> From<MultiLineStringRef<'g>> for geojson::Geometry {
fn from(geometry: MultiLineStringRef<'g>) -> geojson::Geometry {
Expand Down Expand Up @@ -354,6 +367,32 @@ mod tests {
);
}

#[test]
fn test_ref_intersects() {
let coordinates = vec![vec![(0.0, 0.0).into(), (10.0, 10.0).into()]];
let multi_line_string_ref =
MultiLineStringRef::new(coordinates.iter().map(AsRef::as_ref).collect()).unwrap();

assert!(
multi_line_string_ref.intersects(&BoundingBox2D::new_unchecked(
(-1., -1.,).into(),
(11., 11.).into()
))
);
assert!(
multi_line_string_ref.intersects(&BoundingBox2D::new_unchecked(
(2., 2.,).into(),
(9., 9.).into()
))
);
assert!(
!multi_line_string_ref.intersects(&BoundingBox2D::new_unchecked(
(-2., -2.,).into(),
(-2., 12.).into()
))
);
}

#[test]
fn approx_equal() {
let a = MultiLineString::new(vec![
Expand Down
28 changes: 28 additions & 0 deletions datatypes/src/primitives/multi_point.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::convert::{TryFrom, TryInto};
use arrow::array::{ArrayBuilder, BooleanArray};
use arrow::error::ArrowError;
use float_cmp::{ApproxEq, F64Margin};
use geo::intersects::Intersects;
use serde::{Deserialize, Serialize};
use snafu::ensure;

Expand Down Expand Up @@ -286,6 +287,14 @@ where
}
}

impl<'f> Intersects<BoundingBox2D> for MultiPointRef<'f> {
fn intersects(&self, rhs: &BoundingBox2D) -> bool {
self.point_coordinates
.iter()
.any(|c| rhs.contains_coordinate(c))
}
}

impl ApproxEq for &MultiPoint {
type Margin = F64Margin;

Expand Down Expand Up @@ -392,4 +401,23 @@ mod tests {

assert!(!approx_eq!(&MultiPoint, &a, &b, F64Margin::default()));
}

#[test]
fn ref_intersects_bbox() -> Result<()> {
let bbox = BoundingBox2D::new((0.0, 0.0).into(), (1.0, 1.0).into())?;

let v1: Vec<Coordinate2D> = vec![(0.5, 0.5).into()];
let v2: Vec<Coordinate2D> = vec![(1.0, 1.0).into()];
let v3: Vec<Coordinate2D> = vec![(0.5, 0.5).into(), (1.5, 1.5).into()];
let v4: Vec<Coordinate2D> = vec![(1.1, 1.1).into()];
let v5: Vec<Coordinate2D> = vec![(-0.1, -0.1).into(), (1.1, 1.1).into()];

assert!(MultiPointRef::new(&v1)?.intersects(&bbox));
assert!(MultiPointRef::new(&v2)?.intersects(&bbox));
assert!(MultiPointRef::new(&v3)?.intersects(&bbox));
assert!(!MultiPointRef::new(&v4)?.intersects(&bbox));
assert!(!MultiPointRef::new(&v5)?.intersects(&bbox));

Ok(())
}
}
64 changes: 63 additions & 1 deletion datatypes/src/primitives/multi_polygon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ use snafu::ensure;
use crate::collections::VectorDataType;
use crate::error::Error;
use crate::primitives::{
error, BoundingBox2D, GeometryRef, MultiLineString, PrimitivesError, TypedGeometry,
error, BoundingBox2D, GeometryRef, MultiLineString, PrimitivesError, SpatialBounded,
TypedGeometry,
};
use crate::primitives::{Coordinate2D, Geometry};
use crate::util::arrow::{downcast_array, ArrowTyped};
Expand Down Expand Up @@ -356,6 +357,25 @@ impl<'g> MultiPolygonAccess for MultiPolygonRef<'g> {
}
}

impl<'g> SpatialBounded for MultiPolygonRef<'g> {
fn spatial_bounds(&self) -> BoundingBox2D {
let outer_ring_coords = self
.polygons
.iter()
// Use exterior ring (first ring of a polygon)
.filter_map(|p| p.iter().next())
.flat_map(|&exterior| exterior.iter());
BoundingBox2D::from_coord_ref_iter(outer_ring_coords)
.expect("there must be at least one coordinate in a multipolygon")
}
}

impl<'g> Intersects<BoundingBox2D> for MultiPolygonRef<'g> {
fn intersects(&self, rhs: &BoundingBox2D) -> bool {
self.spatial_bounds().intersects_bbox(rhs)
}
}

impl<'g> From<MultiPolygonRef<'g>> for geojson::Geometry {
fn from(geometry: MultiPolygonRef<'g>) -> geojson::Geometry {
geojson::Geometry::new(match geometry.polygons.len() {
Expand Down Expand Up @@ -481,6 +501,48 @@ mod tests {
assert_eq!(aggregate(&multi_polygon), aggregate(&multi_polygon_ref));
}

#[test]
fn test_ref_intersects() {
let coordinates = vec![vec![
vec![
(0.0, 0.0).into(),
(10.0, 0.0).into(),
(10.0, 10.0).into(),
(0.0, 10.0).into(),
(0.0, 0.0).into(),
],
vec![
(4.0, 4.0).into(),
(6.0, 4.0).into(),
(6.0, 6.0).into(),
(4.0, 6.0).into(),
(4.0, 4.0).into(),
],
]];
let multi_polygon_ref = MultiPolygonRef::new(
coordinates
.iter()
.map(|r| r.iter().map(AsRef::as_ref).collect())
.collect(),
)
.unwrap();

assert!(multi_polygon_ref.intersects(&BoundingBox2D::new_unchecked(
(-1., -1.,).into(),
(11., 11.).into()
)));

assert!(multi_polygon_ref.intersects(&BoundingBox2D::new_unchecked(
(4.5, 4.5,).into(),
(5.5, 5.5).into()
)));

assert!(!multi_polygon_ref.intersects(&BoundingBox2D::new_unchecked(
(-11., -1.,).into(),
(-1., 11.).into()
)));
}

#[test]
fn approx_equal() {
let a = MultiPolygon::new(vec![
Expand Down
7 changes: 7 additions & 0 deletions datatypes/src/primitives/no_geometry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::convert::TryFrom;
use arrow::array::{Array, ArrayBuilder, ArrayData, ArrayRef, BooleanArray, JsonEqual};
use arrow::datatypes::DataType;
use arrow::error::ArrowError;
use geo::prelude::Intersects;
use serde::{Deserialize, Serialize};
use serde_json::Value;

Expand All @@ -26,6 +27,12 @@ impl Geometry for NoGeometry {
}
}

impl Intersects<BoundingBox2D> for NoGeometry {
fn intersects(&self, _rhs: &BoundingBox2D) -> bool {
true
}
}

impl GeometryRef for NoGeometry {}

impl TryFrom<TypedGeometry> for NoGeometry {
Expand Down
16 changes: 12 additions & 4 deletions datatypes/src/primitives/spatial_partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,23 +167,23 @@ impl SpatialPartition2D {
}

fn contains_x(&self, other: &Self) -> bool {
crate::util::ranges::value_in_range(
crate::util::ranges::value_in_range_inclusive(
other.upper_left_coordinate.x,
self.upper_left_coordinate.x,
self.lower_right_coordinate.x,
) && crate::util::ranges::value_in_range(
) && crate::util::ranges::value_in_range_inclusive(
other.lower_right_coordinate.x,
self.upper_left_coordinate.x,
self.lower_right_coordinate.x,
)
}

fn contains_y(&self, other: &Self) -> bool {
crate::util::ranges::value_in_range_inv(
crate::util::ranges::value_in_range_inclusive(
other.lower_right_coordinate.y,
self.lower_right_coordinate.y,
self.upper_left_coordinate.y,
) && crate::util::ranges::value_in_range_inv(
) && crate::util::ranges::value_in_range_inclusive(
other.upper_left_coordinate.y,
self.lower_right_coordinate.y,
self.upper_left_coordinate.y,
Expand Down Expand Up @@ -305,6 +305,14 @@ mod tests {
assert!(!p2.contains(&p1));
}

#[test]
fn it_contains_itself() {
let p1 = SpatialPartition2D::new_unchecked((0., 1.).into(), (1., 0.).into());
let p2 = SpatialPartition2D::new_unchecked((0., 1.).into(), (1., 0.).into());
assert!(p1.contains(&p2));
assert!(p2.contains(&p1));
}

#[test]
fn it_contains_coord() {
let p1 = SpatialPartition2D::new_unchecked((0., 1.).into(), (1., 0.).into());
Expand Down
1 change: 1 addition & 0 deletions operators/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ uuid = { version = "0.8", features = ["serde", "v4", "v5"] }
[dev-dependencies]
async-stream = "0.3"
geo-rand = { git = "https://github.com/lelongg/geo-rand", tag = "v0.3.0" }
tokio-util = "0.6"
rand = "0.8"


Expand Down
8 changes: 8 additions & 0 deletions operators/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,10 +270,18 @@ pub enum Error {
source: crate::util::statistics::StatisticsError,
},

#[cfg(feature = "pro")]
#[snafu(display("Executor error: {}", source))]
Executor {
#[snafu(implicit)]
source: crate::pro::executor::error::ExecutorError,
},

#[snafu(display("SparseTilesFillAdapter error: {}", source))]
SparseTilesFillAdapter {
source: crate::adapters::SparseTilesFillAdapterError,
},

#[snafu(context(false))]
ExpressionOperator {
source: crate::processing::ExpressionError,
Expand Down
39 changes: 39 additions & 0 deletions operators/src/pro/executor/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use snafu::Snafu;
use tokio::sync::mpsc::error::SendError;
use tokio::sync::oneshot::error::RecvError;
use tokio::task::JoinError;

pub type Result<T> = std::result::Result<T, ExecutorError>;

#[derive(Debug, Clone, Snafu)]
pub enum ExecutorError {
Submission { message: String },
Panic,
Cancelled,
}

impl From<JoinError> for ExecutorError {
fn from(src: JoinError) -> Self {
if src.is_cancelled() {
ExecutorError::Cancelled
} else {
ExecutorError::Panic
}
}
}

impl<T> From<SendError<T>> for ExecutorError {
fn from(e: SendError<T>) -> Self {
Self::Submission {
message: e.to_string(),
}
}
}

impl From<RecvError> for ExecutorError {
fn from(e: RecvError) -> Self {
Self::Submission {
message: e.to_string(),
}
}
}
Loading