Skip to content

adjust collection parameters parsing to better support OGC API Coverages #787

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Feb 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ jobs:
strategy:
matrix:
os: [ubuntu-latest]
python-version: ["3.9", "3.10", "3.11", "3.12"]
python-version: ["3.10", "3.11", "3.12"]
allow-failure: [false]
test-case: [test-unit-only, test-func-only]
include:
Expand Down
8 changes: 6 additions & 2 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,15 @@ Changes

Changes:
--------
- No change.
- Update ``owslib==0.32.1`` for parameters fixes employed by *Collection Input* with ``format=ogc-coverage-collection``.
- Drop support of Python 3.9 (required for ``owslib==0.32.1`` dependency).

Fixes:
------
- No change.
- Fix parsing of *Collection Input* ``format=ogc-coverage-collection`` and ``format=ogc-map-collection``
to provide additional parameters to the remote collection request.
- Update ``pygeofilter>=0.3.1`` to resolve ``filter-lang=FES`` parser as per other filters
(relates to `geopython/pygeofilter#102 <https://github.com/geopython/pygeofilter/pull/102>`_).

.. _changes_6.2.0:

Expand Down
4 changes: 2 additions & 2 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ for each process.
* - releases
- | |version| |commits-since| |docker_image|

.. |py_ver| image:: https://img.shields.io/badge/python-3.9%2B-blue.svg?logo=python
:alt: Requires Python 3.9+
.. |py_ver| image:: https://img.shields.io/badge/python-3.10%2B-blue.svg?logo=python
:alt: Requires Python 3.10+
:target: https://www.python.org/getit

.. |commits-since| image:: https://img.shields.io/github/commits-since/crim-ca/weaver/6.2.0.svg?logo=github
Expand Down
12 changes: 4 additions & 8 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,10 @@ mako
# employed by cwltool -> schema-salad -> mistune
#mistune>=2.0.3,<2.1
mypy_boto3_s3
numpy>=1.22.2,<2; python_version < "3.10"
numpy>=1.22.2; python_version >= "3.10"
numpy>=1.22.2
# esgf-compute-api (cwt) needs oauthlib but doesn't add it in their requirements
oauthlib
owslib==0.29.3
owslib==0.32.1
PasteDeploy>=3.1.0; python_version >= "3.12"
pint
psutil
Expand All @@ -86,7 +85,7 @@ psutil
# https://github.com/tschaub/ogcapi-features/tree/json-array-expression/extensions/cql/jfe
# - extra 'backend-native' uses shapely for Python objects
# - support FES, CQL(TEXT,JSON), CQL2(TEXT,JSON), ECQL, JFE
pygeofilter[fes,backend-native]
pygeofilter[fes,backend-native]>=0.3.1
# pymongo>=4 breaks for some kombu combinations corresponding to pinned Celery
# - https://github.com/crim-ca/weaver/issues/386
# - https://github.com/celery/kombu/pull/1536
Expand Down Expand Up @@ -119,10 +118,7 @@ schema-salad>=8.3.20221209165047,<9
shapely
simplejson
# urllib3 not directly required, pinned by Snyk to avoid CVE-2024-37891
# Python<3.10 error via pip, avoid endless package install lookup error with botocore
# (https://github.com/pypa/pip/issues/12827#issuecomment-2211291150)
urllib3>=2.2.2 ; python_version >= "3.10"
urllib3==1.26.19 ; python_version < "3.10" # pyup: ignore
urllib3>=2.2.2
urlmatch
xmltodict
webob
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def _parse_requirements(file_path, requirements, links):
package_data={"": ["*.mako"]},
zip_safe=False,
test_suite="tests",
python_requires=">=3.9, <4",
python_requires=">=3.10, <4",
install_requires=REQUIREMENTS,
dependency_links=LINKS,
extras_require={
Expand Down
1 change: 1 addition & 0 deletions tests/functional/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os

APP_PKG_ROOT = os.path.join(os.path.dirname(__file__), "application-packages")
TEST_DATA_ROOT = os.path.join(os.path.dirname(__file__), "test-data")
Binary file added tests/functional/test-data/test.nc
Binary file not shown.
85 changes: 84 additions & 1 deletion tests/functional/test_wps_package.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from parameterized import parameterized

from tests import resources
from tests.functional import TEST_DATA_ROOT
from tests.functional.utils import ResourcesUtil, WpsConfigBase
from tests.utils import (
MOCK_AWS_REGION,
Expand Down Expand Up @@ -2354,7 +2355,6 @@ def test_execute_job_with_collection_input_ogc_features(self, filter_method, fil
filter_match = responses.matchers.query_param_matcher({
"filter": filter_value,
"filter-lang": filter_lang,
"timeout": "10", # added internally by collection processor
})
else:
filter_match = responses.matchers.json_params_matcher(filter_value)
Expand Down Expand Up @@ -2489,6 +2489,89 @@ def test_execute_job_with_collection_input_stac_items(self):
assert "features" in out_data and isinstance(out_data["features"], list)
assert len(out_data["features"]) == 1

@parameterized.expand([
(
{"subset": "Lat(10:20),Lon(30:40)", "datetime": "2025-01-01/2025-01-02"},
"?subset=Lat(10:20),Lon(30:40)&datetime=2025-01-01/2025-01-02",
),
(
{"subset": {"Lat": [10, 20], "Lon": [30, 40]}, "datetime": ["2025-01-01", "2025-01-02"]},
"?subset=Lat(10:20),Lon(30:40)&datetime=2025-01-01/2025-01-02",
),
])
def test_execute_job_with_collection_input_coverages_netcdf(self, coverage_parameters, coverage_request):
# type: (JSON, str) -> None
proc_name = "DockerNetCDF2Text"
body = self.retrieve_payload(proc_name, "deploy", local=True)
cwl = self.retrieve_payload(proc_name, "package", local=True)
body["executionUnit"] = [{"unit": cwl}]
proc_id = self.fully_qualified_test_name(self._testMethodName)
self.deploy_process(body, describe_schema=ProcessSchema.OGC, process_id=proc_id)

with contextlib.ExitStack() as stack:
tmp_host = "https://mocked-file-server.com" # must match collection prefix hostnames
tmp_svr = stack.enter_context(responses.RequestsMock(assert_all_requests_are_fired=False))
test_file = "test.nc"
test_data = stack.enter_context(open(os.path.join(TEST_DATA_ROOT, test_file), mode="rb")).read()

# coverage request expected with resolved query parameters matching submitted collection input parameters
col_url = f"{tmp_host}/collections/climate-data"
col_cov_url = f"{col_url}/coverage"
col_cov_req = f"{col_cov_url}{coverage_request}"
tmp_svr.add("GET", col_cov_req, body=test_data)

col_exec_body = {
"mode": ExecuteMode.ASYNC,
"response": ExecuteResponse.DOCUMENT,
"inputs": {
"input_nc": {
"collection": col_url,
"format": ExecuteCollectionFormat.OGC_COVERAGE, # NOTE: this is the test!
"type": ContentType.APP_NETCDF, # must align with process input media-type
**coverage_parameters,
}
}
}

for mock_exec in mocked_execute_celery():
stack.enter_context(mock_exec)
proc_url = f"/processes/{proc_id}/execution"
resp = mocked_sub_requests(self.app, "post_json", proc_url, timeout=5,
data=col_exec_body, headers=self.json_headers, only_local=True)
assert resp.status_code in [200, 201], f"Failed with: [{resp.status_code}]\nReason:\n{resp.json}"

status_url = resp.json["location"]
results = self.monitor_job(status_url)
assert "output_txt" in results

job_id = status_url.rsplit("/", 1)[-1]
log_url = f"{status_url}/logs"
log_txt = self.app.get(log_url, headers={"Accept": ContentType.TEXT_PLAIN}).text
cov_col = "coverage.nc" # file name applied by 'collection_processor' (resolved by 'format' + 'type' extension)
cov_out = "coverage.txt" # extension modified by invoked process from input file name, literal copy of NetCDF
assert cov_col in log_txt, "Resolved NetCDF file from collection handler should have been logged."
assert cov_out in log_txt, "Chained NetCDF copied by the process as text should have been logged."

wps_dir = get_wps_output_dir(self.settings)
job_dir = os.path.join(wps_dir, job_id)
job_out = os.path.join(job_dir, "output_txt", cov_out)
assert os.path.isfile(job_out), f"Invalid output file not found: [{job_out}]"
with open(job_out, mode="rb") as out_fd: # output, although ".txt" is actually a copy of the submitted NetCDF
out_data = out_fd.read(3)
assert out_data == b"CDF", "Output file from (collection + process) chain should contain the NetCDF header."

for file_path in [
os.path.join(job_dir, cov_col),
os.path.join(job_dir, "inputs", cov_col),
os.path.join(job_dir, "output_txt", cov_col),
os.path.join(job_out, cov_col),
os.path.join(job_out, "inputs", cov_col),
os.path.join(job_out, "output_txt", cov_col),
]:
assert not os.path.exists(file_path), (
f"Intermediate collection coverage file should not exist: [{file_path}]"
)

def test_execute_job_with_context_output_dir(self):
cwl = {
"cwlVersion": "v1.0",
Expand Down
6 changes: 0 additions & 6 deletions weaver/compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,3 @@ def patch(self) -> int:
@property
def micro(self) -> int:
return self.patch

try:
from functools import cache # pylint: disable=unused-import # definition for other modules to import
except ImportError: # python<3.9 # pragma: no cover
from functools import lru_cache
cache = lru_cache(maxsize=None)
2 changes: 1 addition & 1 deletion weaver/formats.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import os
import re
import socket
from functools import cache
from typing import TYPE_CHECKING, cast, overload
from urllib.error import HTTPError, URLError
from urllib.request import urlopen
Expand All @@ -17,7 +18,6 @@
from requests.exceptions import ConnectionError

from weaver.base import Constants, classproperty
from weaver.compat import cache

if TYPE_CHECKING:
from typing import Any, AnyStr, Dict, List, Optional, Tuple, TypeAlias, TypeVar, Union
Expand Down
2 changes: 1 addition & 1 deletion weaver/processes/builtin/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import os
import sys
from functools import cache
from importlib import import_module
from string import Template
from typing import TYPE_CHECKING
Expand All @@ -12,7 +13,6 @@
from cwltool.singularity import SingularityCommandLineJob

from weaver import WEAVER_ROOT_DIR
from weaver.compat import cache
from weaver.database import get_db
from weaver.datatype import Process
from weaver.exceptions import PackageExecutionError, PackageNotFound, ProcessNotAccessible, ProcessNotFound
Expand Down
58 changes: 44 additions & 14 deletions weaver/processes/builtin/collection_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
from weaver.wps_restapi import swagger_definitions as sd # isort:skip # noqa: E402

if TYPE_CHECKING:
from typing import List
from typing import Dict, List

from pystac import Asset

Expand Down Expand Up @@ -127,8 +127,13 @@
api_url, col_id = col_parts if len(col_parts) == 2 else (None, col_parts[0])
col_id_alt = get_any_id(col_input, pop=True)
col_id = col_id or col_id_alt
timeout = col_args.pop("timeout", 10)

col_args.setdefault("timeout", 10)
# convert all query parameters to their corresponding function parameter name
for arg in list(col_args):
if "-" in arg:
col_args[arg.replace("-", "_")] = col_args.pop(arg)
col_args = parse_collection_parameters(col_args)

logger.log( # pylint: disable=E1205 # false positive
logging.INFO,
Expand All @@ -144,7 +149,7 @@
col_href,
queries=col_args,
headers={"Accept": f"{ContentType.APP_GEOJSON},{ContentType.APP_JSON}"},
timeout=col_args["timeout"],
timeout=timeout,
retries=3,
only_server_errors=False,
)
Expand All @@ -161,16 +166,12 @@
resolved_files.append(file_obj)

elif col_fmt in [ExecuteCollectionFormat.STAC, ExecuteCollectionFormat.STAC_ITEMS]:
# convert all parameters to their corresponding name of the query utility, and ignore unknown ones
for arg in list(col_args):
if "-" in arg:
col_args[arg.replace("-", "_")] = col_args.pop(arg)
# ignore unknown or enforced parameters
known_params = set(inspect.signature(ItemSearch).parameters)
known_params -= {"url", "method", "stac_io", "client", "collection", "ids", "modifier"}
for param in set(col_args) - known_params:
col_args.pop(param)

timeout = col_args.pop("timeout", 10)
search_url = f"{api_url}/search"
search = ItemSearch(
url=search_url,
Expand All @@ -193,12 +194,18 @@
resolved_files.append(file_obj)

elif col_fmt == ExecuteCollectionFormat.OGC_FEATURES:
if str(col_args.get("filter-lang")) == "cql2-json":
if str(col_args.get("filter_lang")) == "cql2-json":
col_args["cql"] = col_args.pop("filter")
col_args.pop("filter_lang")
else:
for arg in list(col_args):
if arg.startswith("filter_"):
col_args[arg.replace("_", "-")] = col_args.pop(arg)
search = Features(
url=api_url,
# FIXME: add 'auth' or 'headers' authorization/cookies?
headers={"Accept": f"{ContentType.APP_GEOJSON}, {ContentType.APP_VDN_GEOJSON}, {ContentType.APP_JSON}"},
json_="{}", # avoid unnecessary request on init
)
items = search.collection_items(col_id, **col_args)
if items.get("type") != "FeatureCollection" or "features" not in items:
Expand Down Expand Up @@ -228,13 +235,14 @@
url=api_url,
# FIXME: add 'auth' or 'headers' authorization/cookies?
headers={"Accept": ContentType.APP_JSON},
json_="{}", # avoid unnecessary request on init
)
ctype = (col_media_type or [ContentType.IMAGE_COG])[0]
ext = get_extension(ctype, dot=False)
path = os.path.join(output_dir, f"map.{ext}")
path = os.path.join(output_dir, f"coverage.{ext}")
with open(path, mode="wb") as file:
data = cast(io.BytesIO, cov.coverage(col_id)).getbuffer()
file.write(data) # type: ignore
data = cast(io.BytesIO, cov.coverage(col_id, **col_args)).getbuffer()
file.write(data)
_, file_fmt = get_cwl_file_format(ctype)
file_obj = {"class": PACKAGE_FILE_TYPE, "path": f"file://{path}", "format": file_fmt}
resolved_files.append(file_obj)
Expand All @@ -244,13 +252,14 @@
url=api_url,
# FIXME: add 'auth' or 'headers' authorization/cookies?
headers={"Accept": ContentType.APP_JSON},
json_="{}", # avoid unnecessary request on init
)
ctype = (col_media_type or [ContentType.IMAGE_COG])[0]
ext = get_extension(ctype[0], dot=False)
path = os.path.join(output_dir, f"map.{ext}")
with open(path, mode="wb") as file:
data = cast(io.BytesIO, maps.map(col_id)).getbuffer()
file.write(data) # type: ignore
data = cast(io.BytesIO, maps.map(col_id, **col_args)).getbuffer()
file.write(data)

Check warning on line 262 in weaver/processes/builtin/collection_processor.py

View check run for this annotation

Codecov / codecov/patch

weaver/processes/builtin/collection_processor.py#L261-L262

Added lines #L261 - L262 were not covered by tests
_, file_fmt = get_cwl_file_format(ctype)
file_obj = {"class": PACKAGE_FILE_TYPE, "path": f"file://{path}", "format": file_fmt}
resolved_files.append(file_obj)
Expand All @@ -271,6 +280,27 @@
return resolved_files


def parse_collection_parameters(parameters):
# type: (Dict[str, JSON]) -> Dict[str, JSON]
"""
Applies any relevant conversions of known parameters between allowed request format and expected utilities.
"""
if not parameters:
return {}

Check warning on line 289 in weaver/processes/builtin/collection_processor.py

View check run for this annotation

Codecov / codecov/patch

weaver/processes/builtin/collection_processor.py#L289

Added line #L289 was not covered by tests

subset = parameters.get("subset")
if subset and isinstance(subset, str):
subset_dims = {}
for item in subset.split(","):
dim, span = item.split("(", 1)
span = span.split(")", 1)[0]
ranges = span.split(":")
subset_dims[dim] = list(ranges)
parameters["subset"] = subset_dims

return parameters


def process_cwl(collection_input, input_definition, output_dir):
# type: (JobValueCollection, ProcessInputOutputItem, Path) -> CWL_IO_ValueMap
files = process_collection(collection_input, input_definition, output_dir)
Expand Down
2 changes: 1 addition & 1 deletion weaver/processes/wps_package.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import tempfile
import time
import uuid
from functools import cache
from typing import TYPE_CHECKING, cast, overload
from urllib.parse import parse_qsl, urlparse

Expand Down Expand Up @@ -48,7 +49,6 @@
from pywps.validator.mode import MODE
from requests.structures import CaseInsensitiveDict

from weaver.compat import cache
from weaver.config import WeaverConfiguration, WeaverFeature, get_weaver_configuration
from weaver.database import get_db
from weaver.datatype import DockerAuthentication
Expand Down
Loading
Loading