Skip to content

force import using gti #1049

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: new-pixel-based-queries-rebase
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,9 @@ float-cmp = "0.10"
futures = "0.3"
futures-util = "0.3"
gdal = "0.18"
gdal-sys = "0.11"
gdal-sys = { version = "0.11", features = ["bundled"] }
gdal-src = { version = "0.2.0+3.10.2", features = ["driver_gpkg"] }

# when changing geo version also adapt the Cargo.toml in the expression "deps-workspace"!
geo = "0.30.0"
geo-rand = { git = "https://github.com/lelongg/geo-rand.git", branch = "dependabot/cargo/geo-0.30.0"} # TODO: revert back to "0.5" when it is released
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use geoengine_datatypes::{
primitives::TimeInstance,
raster::{Pixel, RasterTile2D, TileInformation},
};
use log::debug;
use pin_project::pin_project;
use rayon::ThreadPool;
use std::marker::PhantomData;
Expand Down Expand Up @@ -289,6 +290,7 @@ where
}
Ok(None) => this.state.set(StateInner::ReturnResult(None)),
Err(e) => {
debug!(">>>> Tile query rectangle not valid: {e}");
this.state.set(StateInner::Ended);
return Poll::Ready(Some(Err(e)));
}
Expand Down Expand Up @@ -382,6 +384,7 @@ where

// If there is a tile, set the current_time_end option.
if let Some(tile) = &tile_option {
debug_assert!(tile.time.end() > tile.time.start());
debug_assert!(*this.current_time_start >= tile.time.start());
*this.current_time_end = Some(tile.time.end());
}
Expand All @@ -408,6 +411,7 @@ where
(None, None) => {
// end the stream since we never recieved a tile from any subquery. Should only happen if we end the first grid iteration.
// NOTE: this assumes that the input operator produces no data tiles for queries where time and space are valid but no data is avalable.
debug!(">>>> Tile stream ended without any data");
debug_assert!(&tile_option.is_none());
debug_assert!(
*this.current_time_start == this.query_rect_to_answer.time_interval.start()
Expand Down Expand Up @@ -437,7 +441,9 @@ where
}
}
}

if let Some(tile) = &tile_option {
debug_assert!(tile.time.end() > tile.time.start());
}
Poll::Ready(Some(Ok(tile_option)))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,25 @@ where
self.state.out_spatial_grid.geo_transform()
);

let valid_pixel_bounds = self
.state
.out_spatial_grid
.grid_bounds()
.intersection(&tile_info.global_pixel_bounds())
.and_then(|b| b.intersection(&query_rect.spatial_query.grid_bounds()));
// TODO: instead of producing an empty stream if the query does not intersect the data or projection,
// we need to actually perform the query in order to give the nodata tiles the correct time interval
// because all tiles in a time step need the same temporal validity, even across diferent quereis

// so we need a way to reliably query the source in such cases that ensures no data tiles are produced.
// what to do in cases where the bbox coordinates cannot be reprojected?
// - query the data bbox but discard the data?
// - implement a special empty bbox query that makes sure to produce empty tiles but correct time intervals?
// - ideally: implement a way to query the time intervals of the source data and produce empty tiles accordingly

let valid_pixel_bounds = dbg!(
dbg!(
self.state
.out_spatial_grid
.grid_bounds()
.intersection(&tile_info.global_pixel_bounds())
)
.and_then(|b| b.intersection(&query_rect.spatial_query.grid_bounds()))
);

let valid_spatial_bounds = valid_pixel_bounds.map(|pb| {
self.state
Expand All @@ -114,20 +127,25 @@ where
let projected_bounds = bounds.reproject(&proj);

match projected_bounds {
Ok(pb) => Ok(Some(RasterQueryRectangle::new_with_grid_bounds(
self.state
.in_spatial_grid
.geo_transform()
.spatial_to_grid_bounds(&pb),
TimeInterval::new_instant(start_time)?,
band_idx.into(),
))),
Ok(pb) => {
dbg!("produce something");
Ok(Some(RasterQueryRectangle::new_with_grid_bounds(
self.state
.in_spatial_grid
.geo_transform()
.spatial_to_grid_bounds(&pb),
TimeInterval::new_instant(start_time)?,
band_idx.into(),
)))
}
// In some strange cases the reprojection can return an empty box.
// We ignore it since it contains no pixels.
Err(geoengine_datatypes::error::Error::OutputBboxEmpty { bbox: _ }) => Ok(None),
Err(e) => Err(e.into()),
}
} else {
dbg!("output query rectangle is not valid in source projection => produce empty tile");

// output query rectangle is not valid in source projection => produce empty tile
Ok(None)
}
Expand Down Expand Up @@ -352,6 +370,8 @@ impl<T: Pixel> FoldTileAccu for TileWithProjectionCoordinates<T> {
type RasterType = T;

async fn into_tile(self) -> Result<RasterTile2D<Self::RasterType>> {
debug_assert!(self.accu_tile.time.end() > self.accu_tile.time.start());
debug_assert!(self.accu_tile.time.end() != self.accu_tile.time.start() + 1);
Ok(self.accu_tile)
}

Expand All @@ -362,6 +382,7 @@ impl<T: Pixel> FoldTileAccu for TileWithProjectionCoordinates<T> {

impl<T: Pixel> FoldTileAccuMut for TileWithProjectionCoordinates<T> {
fn set_time(&mut self, time: TimeInterval) {
debug_assert!(time.end() > time.start());
self.accu_tile.time = time;
}

Expand Down
86 changes: 74 additions & 12 deletions operators/src/adapters/sparse_tiles_fill_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use geoengine_datatypes::{
use pin_project::pin_project;
use snafu::Snafu;
use std::{pin::Pin, task::Poll};
use tracing::debug;

#[derive(Debug, Snafu)]
pub enum SparseTilesFillAdapterError {
Expand Down Expand Up @@ -61,7 +62,11 @@ impl From<TimeInterval> for FillerTimeBounds {
fn from(time: TimeInterval) -> FillerTimeBounds {
FillerTimeBounds {
start: time.start(),
end: time.end(),
end: if time.is_instant() {
time.end() + 1
} else {
time.end()
},
}
}
}
Expand All @@ -74,11 +79,14 @@ impl FillerTimeBounds {
self.end
}

// TODO: return result
pub fn new(start: TimeInstance, end: TimeInstance) -> Self {
debug_assert!(start < end);
Self::new_unchecked(start, end)
}

pub fn new_unchecked(start: TimeInstance, end: TimeInstance) -> Self {
debug_assert!(start < end);
Self { start, end }
}
}
Expand Down Expand Up @@ -107,6 +115,12 @@ struct GridIdxAndBand {
impl<T: Pixel> StateContainer<T> {
/// Create a new no-data `RasterTile2D` with `GridIdx` and time from the current state
fn current_no_data_tile(&self) -> RasterTile2D<T> {
// debug_assert!(
// self.current_time.unwrap().end() != self.current_time.unwrap().start() + 1,
// "current no data tile, current_time: {:?}, current tile time: {:?}",
// self.current_time,
// self.next_tile.as_ref().map(|t| t.time)
// );
RasterTile2D::new(
self.current_time
.expect("time must exist when a tile is stored."),
Expand Down Expand Up @@ -231,6 +245,12 @@ impl<T: Pixel> StateContainer<T> {
}

fn next_time_interval_from_stored_tile(&self) -> Option<TimeInterval> {
debug!(
"filladapter ding current time {:?}, stored tile time {:?}",
self.current_time,
self.next_tile.as_ref().map(|t| t.time)
);

// we wrapped around. We need to do time progress.
if let Some(tile) = &self.next_tile {
let stored_tile_time = tile.time;
Expand Down Expand Up @@ -263,6 +283,7 @@ impl<T: Pixel> StateContainer<T> {
}

fn set_current_time_from_initial_tile(&mut self, first_tile_time: TimeInterval) {
debug_assert!(first_tile_time.end() > first_tile_time.start());
// if we know a bound we must use it to set the current time
let start_data_bound = self.data_time_bounds.start();
let requested_start = self.requested_time_bounds.start();
Expand All @@ -279,10 +300,9 @@ impl<T: Pixel> StateContainer<T> {
requested_start,
start_data_bound
);
self.current_time = Some(TimeInterval::new_unchecked(
start_data_bound,
first_tile_time.start(),
));
self.current_time =
Some(TimeInterval::new(start_data_bound, first_tile_time.start()).unwrap());
debug_assert!(!self.current_time.unwrap().is_instant());
return;
}
if start_data_bound > first_tile_time.start() {
Expand All @@ -297,10 +317,10 @@ impl<T: Pixel> StateContainer<T> {

fn set_current_time_from_data_time_bounds(&mut self) {
assert!(self.state == State::FillToEnd);
self.current_time = Some(TimeInterval::new_unchecked(
self.data_time_bounds.start(),
self.data_time_bounds.end(),
));
self.current_time = Some(
TimeInterval::new(self.data_time_bounds.start(), self.data_time_bounds.end()).unwrap(),
);
debug_assert!(!self.current_time.unwrap().is_instant());
}

fn update_current_time(&mut self, new_time: TimeInterval) {
Expand Down Expand Up @@ -335,6 +355,8 @@ impl<T: Pixel> StateContainer<T> {

debug_assert!(current_time.end() < self.data_time_bounds.end());

debug_assert!(self.requested_time_bounds.end() <= self.data_time_bounds.end());

let new_time = if current_time.is_instant() {
TimeInterval::new_unchecked(current_time.end() + 1, self.data_time_bounds.end())
} else {
Expand Down Expand Up @@ -363,10 +385,15 @@ impl<T: Pixel> StateContainer<T> {
}

fn store_tile(&mut self, tile: RasterTile2D<T>) {
debug_assert!(tile.time.end() > tile.time.start());

debug_assert!(self.next_tile.is_none());
let current_time = self
.current_time
.expect("Time must be set when the first tile arrives");

debug_assert!(current_time.end() > current_time.start());

debug_assert!(current_time.start() <= tile.time.start());
debug_assert!(
current_time.start() < tile.time.start()
Expand Down Expand Up @@ -481,6 +508,11 @@ where
// poll for a first (input) tile
let result_tile = match ready!(this.stream.as_mut().poll_next(cx)) {
Some(Ok(tile)) => {
debug!(
"Initial tile: {:?} with time interval {:?}, currentime: {:?}",
tile.tile_position, tile.time, this.sc.current_time
);
debug_assert!(tile.time.end() > tile.time.start());
// now we have to inspect the time we got and the bound we need to fill. If there are bounds known, then we need to check if the tile starts with the bounds.
this.sc.set_current_time_from_initial_tile(tile.time);

Expand All @@ -494,11 +526,24 @@ where
tile.band,
)
{
debug!(
"AAA Initial tile: {:?} with time interval {:?}, currentime: {:?}",
tile.tile_position, tile.time, this.sc.current_time
);
this.sc.state = State::PollingForNextTile; // return the received tile and set state to polling for the next tile
tile
} else {
this.sc.store_tile(tile);
debug!(
"BBB Initial tile: {:?} with time interval {:?}, currentime: {:?}",
tile.tile_position, tile.time, this.sc.current_time
);
this.sc.store_tile(tile.clone());
this.sc.state = State::FillAndProduceNextTile; // save the tile and go to fill mode

debug!(
"CCC Initial tile: {:?} with time interval {:?}, currentime: {:?}",
tile.tile_position, tile.time, this.sc.current_time
);
this.sc.current_no_data_tile()
}
}
Expand All @@ -508,13 +553,17 @@ where
return Poll::Ready(Some(Err(e)));
}
// the source never produced a tile.
// TODO: this should never happen??
None => {
debug_assert!(this.sc.current_idx == min_idx);
this.sc.state = State::FillToEnd;
this.sc.set_current_time_from_data_time_bounds();
this.sc.current_no_data_tile()
}
};

debug_assert!(result_tile.time.end() > result_tile.time.start());

// move the current_idx. There is no need to do time progress here. Either a new tile triggers that or it is never needed for an empty source.
this.sc.current_idx = wrapped_next_idx;
this.sc.current_band_idx = wrapped_next_band;
Expand All @@ -535,6 +584,17 @@ where

let res = match ready!(this.stream.as_mut().poll_next(cx)) {
Some(Ok(tile)) => {
debug_assert!(
tile.time.end() > tile.time.start(),
"Tile time interval is invalid: {:?}",
tile.time
);

debug!(
"DDD next tile: {:?} with time interval {:?}, currentime: {:?}",
tile.tile_position, tile.time, this.sc.current_time
);

// 1. The start of the recieved TimeInterval MUST NOT BE before the start of the current TimeInterval.
if this.sc.time_starts_before_current_state(tile.time) {
this.sc.state = State::Ended;
Expand All @@ -548,9 +608,10 @@ where
}
if tile.time.start() >= this.sc.requested_time_bounds.end() {
log::warn!(
"The tile time start ({}) is outside of the requested time bounds ({})!",
"The tile time start ({}) is outside of the requested time bounds ({})! end is {}",
tile.time.start(),
this.sc.requested_time_bounds.end()
this.sc.requested_time_bounds.end(),
tile.time.end()
);
}

Expand All @@ -575,6 +636,7 @@ where
if this.sc.time_starts_equals_current_state(tile.time)
&& !this.sc.time_duration_equals_current_state(tile.time)
{
debug!("missmatch tile is empty: {}", tile.is_empty());
this.sc.state = State::Ended;
return Poll::Ready(Some(Err(
SparseTilesFillAdapterError::TileTimeIntervalLengthMissmatch {
Expand Down
2 changes: 1 addition & 1 deletion operators/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ pub enum Error {
InterpolationOperator {
source: crate::processing::InterpolationError,
},
#[snafu(context(false))]
#[snafu(display("Downsampling error: {source}"), context(false))]
DownsampleOperator {
source: crate::processing::DownsamplingError,
},
Expand Down
Loading
Loading