|
28 | 28 | from parameterized import parameterized
|
29 | 29 |
|
30 | 30 | from tests import resources
|
| 31 | +from tests.functional import TEST_DATA_ROOT |
31 | 32 | from tests.functional.utils import ResourcesUtil, WpsConfigBase
|
32 | 33 | from tests.utils import (
|
33 | 34 | MOCK_AWS_REGION,
|
@@ -2354,7 +2355,6 @@ def test_execute_job_with_collection_input_ogc_features(self, filter_method, fil
|
2354 | 2355 | filter_match = responses.matchers.query_param_matcher({
|
2355 | 2356 | "filter": filter_value,
|
2356 | 2357 | "filter-lang": filter_lang,
|
2357 |
| - "timeout": "10", # added internally by collection processor |
2358 | 2358 | })
|
2359 | 2359 | else:
|
2360 | 2360 | filter_match = responses.matchers.json_params_matcher(filter_value)
|
@@ -2489,6 +2489,89 @@ def test_execute_job_with_collection_input_stac_items(self):
|
2489 | 2489 | assert "features" in out_data and isinstance(out_data["features"], list)
|
2490 | 2490 | assert len(out_data["features"]) == 1
|
2491 | 2491 |
|
| 2492 | + @parameterized.expand([ |
| 2493 | + ( |
| 2494 | + {"subset": "Lat(10:20),Lon(30:40)", "datetime": "2025-01-01/2025-01-02"}, |
| 2495 | + "?subset=Lat(10:20),Lon(30:40)&datetime=2025-01-01/2025-01-02", |
| 2496 | + ), |
| 2497 | + ( |
| 2498 | + {"subset": {"Lat": [10, 20], "Lon": [30, 40]}, "datetime": ["2025-01-01", "2025-01-02"]}, |
| 2499 | + "?subset=Lat(10:20),Lon(30:40)&datetime=2025-01-01/2025-01-02", |
| 2500 | + ), |
| 2501 | + ]) |
| 2502 | + def test_execute_job_with_collection_input_coverages_netcdf(self, coverage_parameters, coverage_request): |
| 2503 | + # type: (JSON, str) -> None |
| 2504 | + proc_name = "DockerNetCDF2Text" |
| 2505 | + body = self.retrieve_payload(proc_name, "deploy", local=True) |
| 2506 | + cwl = self.retrieve_payload(proc_name, "package", local=True) |
| 2507 | + body["executionUnit"] = [{"unit": cwl}] |
| 2508 | + proc_id = self.fully_qualified_test_name(self._testMethodName) |
| 2509 | + self.deploy_process(body, describe_schema=ProcessSchema.OGC, process_id=proc_id) |
| 2510 | + |
| 2511 | + with contextlib.ExitStack() as stack: |
| 2512 | + tmp_host = "https://mocked-file-server.com" # must match collection prefix hostnames |
| 2513 | + tmp_svr = stack.enter_context(responses.RequestsMock(assert_all_requests_are_fired=False)) |
| 2514 | + test_file = "test.nc" |
| 2515 | + test_data = stack.enter_context(open(os.path.join(TEST_DATA_ROOT, test_file), mode="rb")).read() |
| 2516 | + |
| 2517 | + # coverage request expected with resolved query parameters matching submitted collection input parameters |
| 2518 | + col_url = f"{tmp_host}/collections/climate-data" |
| 2519 | + col_cov_url = f"{col_url}/coverage" |
| 2520 | + col_cov_req = f"{col_cov_url}{coverage_request}" |
| 2521 | + tmp_svr.add("GET", col_cov_req, body=test_data) |
| 2522 | + |
| 2523 | + col_exec_body = { |
| 2524 | + "mode": ExecuteMode.ASYNC, |
| 2525 | + "response": ExecuteResponse.DOCUMENT, |
| 2526 | + "inputs": { |
| 2527 | + "input_nc": { |
| 2528 | + "collection": col_url, |
| 2529 | + "format": ExecuteCollectionFormat.OGC_COVERAGE, # NOTE: this is the test! |
| 2530 | + "type": ContentType.APP_NETCDF, # must align with process input media-type |
| 2531 | + **coverage_parameters, |
| 2532 | + } |
| 2533 | + } |
| 2534 | + } |
| 2535 | + |
| 2536 | + for mock_exec in mocked_execute_celery(): |
| 2537 | + stack.enter_context(mock_exec) |
| 2538 | + proc_url = f"/processes/{proc_id}/execution" |
| 2539 | + resp = mocked_sub_requests(self.app, "post_json", proc_url, timeout=5, |
| 2540 | + data=col_exec_body, headers=self.json_headers, only_local=True) |
| 2541 | + assert resp.status_code in [200, 201], f"Failed with: [{resp.status_code}]\nReason:\n{resp.json}" |
| 2542 | + |
| 2543 | + status_url = resp.json["location"] |
| 2544 | + results = self.monitor_job(status_url) |
| 2545 | + assert "output_txt" in results |
| 2546 | + |
| 2547 | + job_id = status_url.rsplit("/", 1)[-1] |
| 2548 | + log_url = f"{status_url}/logs" |
| 2549 | + log_txt = self.app.get(log_url, headers={"Accept": ContentType.TEXT_PLAIN}).text |
| 2550 | + cov_col = "coverage.nc" # file name applied by 'collection_processor' (resolved by 'format' + 'type' extension) |
| 2551 | + cov_out = "coverage.txt" # extension modified by invoked process from input file name, literal copy of NetCDF |
| 2552 | + assert cov_col in log_txt, "Resolved NetCDF file from collection handler should have been logged." |
| 2553 | + assert cov_out in log_txt, "Chained NetCDF copied by the process as text should have been logged." |
| 2554 | + |
| 2555 | + wps_dir = get_wps_output_dir(self.settings) |
| 2556 | + job_dir = os.path.join(wps_dir, job_id) |
| 2557 | + job_out = os.path.join(job_dir, "output_txt", cov_out) |
| 2558 | + assert os.path.isfile(job_out), f"Invalid output file not found: [{job_out}]" |
| 2559 | + with open(job_out, mode="rb") as out_fd: # output, although ".txt" is actually a copy of the submitted NetCDF |
| 2560 | + out_data = out_fd.read(3) |
| 2561 | + assert out_data == b"CDF", "Output file from (collection + process) chain should contain the NetCDF header." |
| 2562 | + |
| 2563 | + for file_path in [ |
| 2564 | + os.path.join(job_dir, cov_col), |
| 2565 | + os.path.join(job_dir, "inputs", cov_col), |
| 2566 | + os.path.join(job_dir, "output_txt", cov_col), |
| 2567 | + os.path.join(job_out, cov_col), |
| 2568 | + os.path.join(job_out, "inputs", cov_col), |
| 2569 | + os.path.join(job_out, "output_txt", cov_col), |
| 2570 | + ]: |
| 2571 | + assert not os.path.exists(file_path), ( |
| 2572 | + f"Intermediate collection coverage file should not exist: [{file_path}]" |
| 2573 | + ) |
| 2574 | + |
2492 | 2575 | def test_execute_job_with_context_output_dir(self):
|
2493 | 2576 | cwl = {
|
2494 | 2577 | "cwlVersion": "v1.0",
|
|
0 commit comments