Skip to content

Executor #419

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 49 commits into
base: main
Choose a base branch
from
Open

Executor #419

wants to merge 49 commits into from

Conversation

koerberm
Copy link
Contributor

This is a first draft of the executor integration. I implemented a noop executor that is used when pro-feature is inactive.
There is a sample application in the "plots" endpoint.

@koerberm
Copy link
Contributor Author

@coveralls
Copy link
Collaborator

coveralls commented Dec 10, 2021

Pull Request Test Coverage Report for Build 1914911059

  • 953 of 1147 (83.09%) changed or added relevant lines in 24 files are covered.
  • No unchanged relevant lines lost coverage.
  • Overall coverage decreased (-0.03%) to 85.327%

Changes Missing Coverage Covered Lines Changed/Added Lines %
datatypes/src/primitives/multi_polygon.rs 32 33 96.97%
services/src/handlers/wfs.rs 12 13 92.31%
services/src/pro/util/tests.rs 0 1 0.0%
operators/src/pro/executor/operators.rs 68 70 97.14%
operators/src/util/raster_stream_to_png.rs 8 10 80.0%
services/src/pro/contexts/postgres.rs 1 4 25.0%
services/src/pro/server.rs 0 3 0.0%
services/src/handlers/wms.rs 10 14 71.43%
services/src/pro/contexts/in_memory.rs 1 5 20.0%
operators/src/pro/executor/error.rs 0 8 0.0%
Totals Coverage Status
Change from base Build 1893423386: -0.03%
Covered Lines: 30826
Relevant Lines: 36127

💛 - Coveralls

@ChristianBeilschmidt ChristianBeilschmidt marked this pull request as ready for review January 19, 2022 13:41
Restricted Hash and Eq implementation to concrete Key type
# Conflicts:
#	operators/src/error.rs
# Conflicts:
#	services/src/error.rs
Copy link
Member

@ChristianBeilschmidt ChristianBeilschmidt left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to work 😄

2x NDVI Raster

2022-02-17T08:29:49.881696Z DEBUG geoengine_operators::pro::executor: Starting new computation for request: RasterTaskDescription { id: WorkflowId(8df9b0e6-e4b4-586e-90a3-6cf0f08c4e62), spatial_bounds: SpatialPartition2D { upper_left_coordinate: Coordinate2D { x: 33.75, y: 0.0 }, lower_right_coordinate: Coordinate2D { x: 45.0, y: -11.25 } }, temporal_bounds: TimeInterval [1396353600000, 1396353600000), _p: PhantomData }    
2022-02-17T08:29:49.881885Z DEBUG geoengine_operators::pro::executor: Joining running computation for request. New: RasterTaskDescription { id: WorkflowId(8df9b0e6-e4b4-586e-90a3-6cf0f08c4e62), spatial_bounds: SpatialPartition2D { upper_left_coordinate: Coordinate2D { x: 33.75, y: 0.0 }, lower_right_coordinate: Coordinate2D { x: 45.0, y: -11.25 } }, temporal_bounds: TimeInterval [1396353600000, 1396353600000), _p: PhantomData }, Running: RasterTaskDescription { id: WorkflowId(8df9b0e6-e4b4-586e-90a3-6cf0f08c4e62), spatial_bounds: SpatialPartition2D { upper_left_coordinate: Coordinate2D { x: 33.75, y: 0.0 }, lower_right_coordinate: Coordinate2D { x: 45.0, y: -11.25 } }, temporal_bounds: TimeInterval [1396353600000, 1396353600000), _p: PhantomData }    
2022-02-17T08:29:49.884395Z DEBUG geoengine_operators::pro::executor: Computation finished. Notifying consumer streams. Request: RasterTaskDescription { id: WorkflowId(8df9b0e6-e4b4-586e-90a3-6cf0f08c4e62), spatial_bounds: SpatialPartition2D { upper_left_coordinate: Coordinate2D { x: 33.75, y: 0.0 }, lower_right_coordinate: Coordinate2D { x: 45.0, y: -11.25 } }, temporal_bounds: TimeInterval [1396353600000, 1396353600000), _p: PhantomData }   

Raster-Vector-Join of NDVI and Natural Earth Points, then 2x Histogram:

2022-02-17T08:34:16.807763Z DEBUG geoengine_operators::pro::executor: Starting new computation for request: RasterTaskDescription { id: WorkflowId(8df9b0e6-e4b4-586e-90a3-6cf0f08c4e62), spatial_bounds: SpatialPartition2D { upper_left_coordinate: Coordinate2D { x: 45.0, y: 22.5 }, lower_right_coordinate: Coordinate2D { x: 56.25, y: 11.25 } }, temporal_bounds: TimeInterval [1396353600000, 1396353600000), _p: PhantomData }    
2022-02-17T08:34:16.809175Z DEBUG geoengine_operators::pro::executor: Stream progressed too far or results do not cover requested result. Starting new computation for request: RasterTaskDescription { id: WorkflowId(8df9b0e6-e4b4-586e-90a3-6cf0f08c4e62), spatial_bounds: SpatialPartition2D { upper_left_coordinate: Coordinate2D { x: 33.75, y: 11.25 }, lower_right_coordinate: Coordinate2D { x: 45.0, y: 0.0 } }, temporal_bounds: TimeInterval [1396353600000, 1396353600000), _p: PhantomData }    
2022-02-17T08:34:16.809332Z DEBUG geoengine_operators::pro::executor: Stream progressed too far or results do not cover requested result. Starting new computation for request: RasterTaskDescription { id: WorkflowId(8df9b0e6-e4b4-586e-90a3-6cf0f08c4e62), spatial_bounds: SpatialPartition2D { upper_left_coordinate: Coordinate2D { x: 45.0, y: 45.0 }, lower_right_coordinate: Coordinate2D { x: 56.25, y: 33.75 } }, temporal_bounds: TimeInterval [1396353600000, 1396353600000), _p: PhantomData }    
2022-02-17T08:34:16.809545Z DEBUG geoengine_operators::pro::executor: Stream progressed too far or results do not cover requested result. Starting new computation for request: RasterTaskDescription { id: WorkflowId(8df9b0e6-e4b4-586e-90a3-6cf0f08c4e62), spatial_bounds: SpatialPartition2D { upper_left_coordinate: Coordinate2D { x: 0.0, y: 56.25 }, lower_right_coordinate: Coordinate2D { x: 11.25, y: 45.0 } }, temporal_bounds: TimeInterval [1396353600000, 1396353600000), _p: PhantomData }    
2022-02-17T08:34:16.810209Z DEBUG geoengine_operators::pro::executor: Computation finished. Notifying consumer streams. Request: RasterTaskDescription { id: WorkflowId(8df9b0e6-e4b4-586e-90a3-6cf0f08c4e62), spatial_bounds: SpatialPartition2D { upper_left_coordinate: Coordinate2D { x: 45.0, y: 22.5 }, lower_right_coordinate: Coordinate2D { x: 56.25, y: 11.25 } }, temporal_bounds: TimeInterval [1396353600000, 1396353600000), _p: PhantomData }

Maybe we need some experiments when it joins and when this does not work. Maybe chunk sizes for vector data are too small 🤷 .

fn slice_result(&self, result: &Self::ResultType) -> Option<Self::ResultType> {
Some(match result {
Ok(wpo) => Ok(wpo.clone()),
Err(_e) => Err(crate::error::Error::NotYetImplemented),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would be necessary to do in case of an error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem here is that our Error type is not cloneable. So I cannot return the original error received from the computation.
Either we find a way to make our error cloneable (which is impossible due to wrapped errors I think), or we introduce a new error type that contains e.g, a meaningful message .

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I guess this is a huge topic to discuss.

@koerberm koerberm mentioned this pull request Mar 24, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants