Skip to content

Commit 032b398

Browse files
author
Marco Mancini
committed
Add OGC API item and map
1 parent 3734918 commit 032b398

File tree

15 files changed

+630
-79
lines changed

15 files changed

+630
-79
lines changed

api/app/endpoint_handlers/dataset.py

Lines changed: 87 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33
import pika
44
from typing import Optional
55

6-
from dbmanager.dbmanager import DBManager
6+
from fastapi.responses import FileResponse
7+
8+
from dbmanager.dbmanager import DBManager, RequestStatus
79
from geoquery.geoquery import GeoQuery
810
from geoquery.task import TaskList
911
from datastore.datastore import Datastore, DEFAULT_MAX_REQUEST_SIZE_GB
@@ -18,12 +20,18 @@
1820
from api_utils import make_bytes_readable_dict
1921
from validation import assert_product_exists
2022

23+
from . import request
2124

2225
log = get_dds_logger(__name__)
2326
data_store = Datastore()
2427

2528
MESSAGE_SEPARATOR = os.environ["MESSAGE_SEPARATOR"]
2629

30+
def _is_etimate_enabled(dataset_id, product_id):
31+
if dataset_id in ("sentinel-2",):
32+
return False
33+
return True
34+
2735

2836
@log_execution_time(log)
2937
def get_datasets(user_roles_names: list[str]) -> list[dict]:
@@ -213,7 +221,7 @@ def estimate(
213221

214222
@log_execution_time(log)
215223
@assert_product_exists
216-
def query(
224+
def async_query(
217225
user_id: str,
218226
dataset_id: str,
219227
product_id: str,
@@ -250,21 +258,22 @@ def query(
250258
251259
"""
252260
log.debug("geoquery: %s", query)
253-
estimated_size = estimate(dataset_id, product_id, query, "GB").get("value")
254-
allowed_size = data_store.product_metadata(dataset_id, product_id).get(
255-
"maximum_query_size_gb", DEFAULT_MAX_REQUEST_SIZE_GB
256-
)
257-
if estimated_size > allowed_size:
258-
raise exc.MaximumAllowedSizeExceededError(
259-
dataset_id=dataset_id,
260-
product_id=product_id,
261-
estimated_size_gb=estimated_size,
262-
allowed_size_gb=allowed_size,
263-
)
264-
if estimated_size == 0.0:
265-
raise exc.EmptyDatasetError(
266-
dataset_id=dataset_id, product_id=product_id
261+
if _is_etimate_enabled(dataset_id, product_id):
262+
estimated_size = estimate(dataset_id, product_id, query, "GB").get("value")
263+
allowed_size = data_store.product_metadata(dataset_id, product_id).get(
264+
"maximum_query_size_gb", DEFAULT_MAX_REQUEST_SIZE_GB
267265
)
266+
if estimated_size > allowed_size:
267+
raise exc.MaximumAllowedSizeExceededError(
268+
dataset_id=dataset_id,
269+
product_id=product_id,
270+
estimated_size_gb=estimated_size,
271+
allowed_size_gb=allowed_size,
272+
)
273+
if estimated_size == 0.0:
274+
raise exc.EmptyDatasetError(
275+
dataset_id=dataset_id, product_id=product_id
276+
)
268277
broker_conn = pika.BlockingConnection(
269278
pika.ConnectionParameters(
270279
host=os.getenv("BROKER_SERVICE_HOST", "broker")
@@ -295,6 +304,68 @@ def query(
295304
broker_conn.close()
296305
return request_id
297306

307+
@log_execution_time(log)
308+
@assert_product_exists
309+
def sync_query(
310+
user_id: str,
311+
dataset_id: str,
312+
product_id: str,
313+
query: GeoQuery,
314+
):
315+
"""Realize the logic for the endpoint:
316+
317+
`POST /datasets/{dataset_id}/{product_id}/execute`
318+
319+
Query the data and return the result of the request.
320+
321+
Parameters
322+
----------
323+
user_id : str
324+
ID of the user executing the query
325+
dataset_id : str
326+
ID of the dataset
327+
product_id : str
328+
ID of the product
329+
query : GeoQuery
330+
Query to perform
331+
332+
Returns
333+
-------
334+
request_id : int
335+
ID of the request
336+
337+
Raises
338+
-------
339+
MaximumAllowedSizeExceededError
340+
if the allowed size is below the estimated one
341+
EmptyDatasetError
342+
if estimated size is zero
343+
344+
"""
345+
346+
import time
347+
request_id = async_query(user_id, dataset_id, product_id, query)
348+
status, _ = DBManager().get_request_status_and_reason(request_id)
349+
log.debug("sync query: status: %s", status)
350+
while status in (RequestStatus.RUNNING, RequestStatus.QUEUED,
351+
RequestStatus.PENDING):
352+
time.sleep(1)
353+
status, _ = DBManager().get_request_status_and_reason(request_id)
354+
log.debug("sync query: status: %s", status)
355+
356+
if status is RequestStatus.DONE:
357+
download_details = DBManager().get_download_details_for_request_id(
358+
request_id
359+
)
360+
return FileResponse(
361+
path=download_details.location_path,
362+
filename=download_details.location_path.split(os.sep)[-1],
363+
)
364+
raise exc.ProductRetrievingError(
365+
dataset_id=dataset_id,
366+
product_id=product_id,
367+
status=status.name)
368+
298369

299370
@log_execution_time(log)
300371
def run_workflow(

api/app/endpoint_handlers/request.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,11 @@ def get_request_resulting_size(request_id: int):
8686
If the request was not found
8787
"""
8888
if request := DBManager().get_request_details(request_id):
89-
return request.download.size_bytes
89+
size = request.download.size_bytes
90+
if not size or size == 0:
91+
raise exc.EmptyDatasetError(dataset_id=request.dataset,
92+
product_id=request.product)
93+
return size
9094
log.info(
9195
"request with id '%s' could not be found",
9296
request_id,

api/app/exceptions.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,3 +180,16 @@ def __init__(self, dataset_id, product_id):
180180
product_id=product_id,
181181
)
182182
super().__init__(self.msg)
183+
184+
class ProductRetrievingError(BaseDDSException):
185+
"""Retrieving of the product failed."""
186+
187+
msg: str = "Retrieving of the product '{dataset_id}.{product_id}' failed with the status {status}"
188+
189+
def __init__(self, dataset_id, product_id, status):
190+
self.msg = self.msg.format(
191+
dataset_id=dataset_id,
192+
product_id=product_id,
193+
status=status
194+
)
195+
super().__init__(self.msg)

api/app/main.py

Lines changed: 114 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33
import os
44
from typing import Optional
55

6-
from fastapi import FastAPI, HTTPException, Request, status
6+
from datetime import datetime
7+
8+
from fastapi import FastAPI, HTTPException, Request, status, Query
79
from fastapi.middleware.cors import CORSMiddleware
810
from starlette.middleware.authentication import AuthenticationMiddleware
911
from starlette.authentication import requires
@@ -18,6 +20,7 @@
1820

1921
from geoquery.geoquery import GeoQuery
2022
from geoquery.task import TaskList
23+
from geoquery.geoquery import GeoQuery
2124

2225
from utils.api_logging import get_dds_logger
2326
import exceptions as exc
@@ -32,6 +35,21 @@
3235
from const import venv, tags
3336
from auth import scopes
3437

38+
def map_to_geoquery(
39+
variables: list[str],
40+
format: str,
41+
bbox: str | None = None, # minx, miny, maxx, maxy (minlon, minlat, maxlon, maxlat)
42+
time: datetime | None = None,
43+
**format_kwargs
44+
) -> GeoQuery:
45+
46+
bbox_ = [float(x) for x in bbox.split(',')]
47+
area = { 'west': bbox_[0], 'south': bbox_[1], 'east': bbox_[2], 'north': bbox_[3], }
48+
time_ = { 'year': time.year, 'month': time.month, 'day': time.day, 'hour': time.hour}
49+
query = GeoQuery(variable=variables, time=time_, area=area,
50+
format_args=format_kwargs, format=format)
51+
return query
52+
3553
logger = get_dds_logger(__name__)
3654

3755
# ======== JSON encoders extension ========= #
@@ -155,6 +173,100 @@ async def get_product_details(
155173
except exc.BaseDDSException as err:
156174
raise err.wrap_around_http_exception() from err
157175

176+
@app.get("/datasets/{dataset_id}/{product_id}/map", tags=[tags.DATASET])
177+
@timer(
178+
app.state.api_request_duration_seconds,
179+
labels={"route": "GET /datasets/{dataset_id}/{product_id}"},
180+
)
181+
async def get_map(
182+
request: Request,
183+
dataset_id: str,
184+
product_id: str,
185+
# OGC WMS parameters
186+
width: int,
187+
height: int,
188+
layers: str | None = None,
189+
format: str | None = 'png',
190+
time: datetime | None = None,
191+
transparent: bool | None = 'true',
192+
bgcolor: str | None = 'FFFFFF',
193+
bbox: str | None = None, # minx, miny, maxx, maxy (minlon, minlat, maxlon, maxlat)
194+
crs: str | None = None,
195+
# OGC map parameters
196+
# subset: str | None = None,
197+
# subset_crs: str | None = Query(..., alias="subset-crs"),
198+
# bbox_crs: str | None = Query(..., alias="bbox-crs"),
199+
):
200+
201+
app.state.api_http_requests_total.inc(
202+
{"route": "GET /datasets/{dataset_id}/{product_id}/map"}
203+
)
204+
# query should be the OGC query
205+
# map OGC parameters to GeoQuery
206+
# variable: Optional[Union[str, List[str]]]
207+
# time: Optional[Union[Dict[str, str], Dict[str, List[str]]]]
208+
# area: Optional[Dict[str, float]]
209+
# location: Optional[Dict[str, Union[float, List[float]]]]
210+
# vertical: Optional[Union[float, List[float], Dict[str, float]]]
211+
# filters: Optional[Dict]
212+
# format: Optional[str]
213+
query = map_to_geoquery(variables=layers, bbox=bbox, time=time,
214+
format="png", width=width, height=height,
215+
transparent=transparent, bgcolor=bgcolor)
216+
try:
217+
return dataset_handler.sync_query(
218+
user_id=request.user.id,
219+
dataset_id=dataset_id,
220+
product_id=product_id,
221+
query=query
222+
)
223+
except exc.BaseDDSException as err:
224+
raise err.wrap_around_http_exception() from err
225+
226+
@app.get("/datasets/{dataset_id}/{product_id}/items/{feature_id}", tags=[tags.DATASET])
227+
@timer(
228+
app.state.api_request_duration_seconds,
229+
labels={"route": "GET /datasets/{dataset_id}/{product_id}/items/{feature_id}"},
230+
)
231+
async def get_feature(
232+
request: Request,
233+
dataset_id: str,
234+
product_id: str,
235+
feature_id: str,
236+
# OGC feature parameters
237+
time: datetime | None = None,
238+
bbox: str | None = None, # minx, miny, maxx, maxy (minlon, minlat, maxlon, maxlat)
239+
crs: str | None = None,
240+
# OGC map parameters
241+
# subset: str | None = None,
242+
# subset_crs: str | None = Query(..., alias="subset-crs"),
243+
# bbox_crs: str | None = Query(..., alias="bbox-crs"),
244+
):
245+
246+
app.state.api_http_requests_total.inc(
247+
{"route": "GET /datasets/{dataset_id}/{product_id}/items/{feature_id}"}
248+
)
249+
# query should be the OGC query
250+
# feature OGC parameters to GeoQuery
251+
# variable: Optional[Union[str, List[str]]]
252+
# time: Optional[Union[Dict[str, str], Dict[str, List[str]]]]
253+
# area: Optional[Dict[str, float]]
254+
# location: Optional[Dict[str, Union[float, List[float]]]]
255+
# vertical: Optional[Union[float, List[float], Dict[str, float]]]
256+
# filters: Optional[Dict]
257+
# format: Optional[str]
258+
259+
query = map_to_geoquery(variables=[feature_id], bbox=bbox, time=time,
260+
format="geojson")
261+
try:
262+
return dataset_handler.sync_query(
263+
user_id=request.user.id,
264+
dataset_id=dataset_id,
265+
product_id=product_id,
266+
query=query
267+
)
268+
except exc.BaseDDSException as err:
269+
raise err.wrap_around_http_exception() from err
158270

159271
@app.get("/datasets/{dataset_id}/{product_id}/metadata", tags=[tags.DATASET])
160272
@timer(
@@ -222,7 +334,7 @@ async def query(
222334
{"route": "POST /datasets/{dataset_id}/{product_id}/execute"}
223335
)
224336
try:
225-
return dataset_handler.query(
337+
return dataset_handler.async_query(
226338
user_id=request.user.id,
227339
dataset_id=dataset_id,
228340
product_id=product_id,

0 commit comments

Comments
 (0)