Skip to content

Commit 4f4e950

Browse files
authored
Merge pull request #9 from actinia-org/execute_process
Execute a process
2 parents 526d02f + 30eacce commit 4f4e950

File tree

10 files changed

+393
-24
lines changed

10 files changed

+393
-24
lines changed

config/mount/sample.ini

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
[ACTINIA]
22
processing_base_url = http://actinia-core:8088/api/v3
33
use_actinia_modules = True
4+
default_project = "nc_spm_08"
45

56
[LOGCONFIG]
67
logfile = actinia-ogc-api-processes-plugin.log

docker/docker-compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ services:
2525

2626

2727
actinia-core:
28-
image: mundialis/actinia:2.13.1
28+
image: mundialis/actinia:2.14.0
2929
# ports:
3030
# - "8088:8088"
3131
depends_on:
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
#!/usr/bin/env python
2+
"""SPDX-FileCopyrightText: (c) 2026 by mundialis GmbH & Co. KG.
3+
4+
SPDX-License-Identifier: GPL-3.0-or-later
5+
6+
Process Execution class
7+
"""
8+
9+
__license__ = "GPL-3.0-or-later"
10+
__author__ = "Carmen Tawalika"
11+
__copyright__ = "Copyright 2026 mundialis GmbH & Co. KG"
12+
__maintainer__ = "mundialis GmbH & Co. KG"
13+
14+
from flask import jsonify, make_response
15+
from flask_restful_swagger_2 import Resource, request, swagger
16+
from requests.exceptions import ConnectionError # noqa: A004
17+
18+
from actinia_ogc_api_processes_plugin.apidocs import process_execution
19+
from actinia_ogc_api_processes_plugin.authentication import require_basic_auth
20+
from actinia_ogc_api_processes_plugin.core.actinia_common import (
21+
safe_parse_actinia_job,
22+
)
23+
from actinia_ogc_api_processes_plugin.core.process_execution import (
24+
generate_new_joblinks,
25+
post_process_execution,
26+
)
27+
from actinia_ogc_api_processes_plugin.model.response_models import (
28+
SimpleStatusCodeResponseModel,
29+
)
30+
from actinia_ogc_api_processes_plugin.resources.logging import log
31+
32+
33+
class ProcessExecution(Resource):
34+
"""ProcessExecution handling."""
35+
36+
@require_basic_auth()
37+
@swagger.doc(process_execution.describe_process_execution_post_docs)
38+
def post(self, process_id):
39+
"""ProcessExecution post method.
40+
41+
Execute a process for the given process_id.
42+
"""
43+
try:
44+
postbody = request.json
45+
resp = post_process_execution(process_id, postbody)
46+
if resp.status_code == 200:
47+
job_id, status_info = safe_parse_actinia_job(resp.json())
48+
if job_id not in status_info.get("links"):
49+
status_info["links"] = generate_new_joblinks(job_id)
50+
return make_response(status_info, 201)
51+
elif resp.status_code == 401:
52+
log.error("ERROR: Unauthorized Access")
53+
log.debug(f"actinia response: {resp.text}")
54+
res = jsonify(
55+
SimpleStatusCodeResponseModel(
56+
status=401,
57+
message="ERROR: Unauthorized Access",
58+
),
59+
)
60+
return make_response(res, 401)
61+
elif resp.status_code == 404:
62+
log.error("ERROR: No such process")
63+
log.debug(f"actinia response: {resp.text}")
64+
# If operation is executed using an invalid process identifier,
65+
# the response SHALL be HTTP status code 404.
66+
# The content of that response SHALL be based upon
67+
# the OpenAPI 3.0 schema exception.yaml.
68+
# https://schemas.opengis.net/ogcapi/processes/part1/1.0/openapi/schemas/exception.yaml
69+
# The type of the exception SHALL be:
70+
# “http://www.opengis.net/def/exceptions/ogcapi-processes-1/1.0/no-such-process”.
71+
res = jsonify(
72+
{
73+
"type": "http://www.opengis.net/def/exceptions/"
74+
"ogcapi-processes-1/1.0/no-such-process",
75+
"title": "No Such Process",
76+
"status": 404,
77+
"detail": f"Process '{process_id}' not found",
78+
},
79+
)
80+
return make_response(res, 404)
81+
else:
82+
log.error("ERROR: Internal Server Error")
83+
log.debug(f"actinia status code: {resp.status_code}")
84+
log.debug(f"actinia response: {resp.text}")
85+
res = jsonify(
86+
SimpleStatusCodeResponseModel(
87+
status=500,
88+
message="ERROR: Internal Server Error",
89+
),
90+
)
91+
return make_response(res, 500)
92+
except ConnectionError as e:
93+
log.error(f"Connection ERROR: {e}")
94+
res = jsonify(
95+
SimpleStatusCodeResponseModel(
96+
status=503,
97+
message=f"Connection ERROR: {e}",
98+
),
99+
)
100+
return make_response(res, 503)
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
#!/usr/bin/env python
2+
"""SPDX-FileCopyrightText: (c) 2026 by mundialis GmbH & Co. KG.
3+
4+
SPDX-License-Identifier: GPL-3.0-or-later
5+
6+
Process Execution class
7+
"""
8+
9+
__license__ = "GPL-3.0-or-later"
10+
__author__ = "Carmen Tawalika, Lina Krisztian"
11+
__copyright__ = "Copyright 2026 mundialis GmbH & Co. KG"
12+
__maintainer__ = "mundialis GmbH & Co. KG"
13+
14+
15+
from actinia_ogc_api_processes_plugin.model.response_models import (
16+
SimpleStatusCodeResponseModel,
17+
)
18+
19+
describe_process_execution_post_docs = {
20+
# "summary" is taken from the description of the get method
21+
"tags": ["process_execution"],
22+
"description": "Executes a process.",
23+
"responses": {
24+
"201": {
25+
"description": "This response returns the status info of the "
26+
"successfully started process.",
27+
},
28+
"401": {
29+
"description": (
30+
"This response returns an "
31+
"'Unauthorized Access' error message"
32+
),
33+
"schema": SimpleStatusCodeResponseModel,
34+
},
35+
"404": {
36+
"description": (
37+
"This response returns an 'No such process' error message"
38+
),
39+
"schema": SimpleStatusCodeResponseModel,
40+
},
41+
"500": {
42+
"description": (
43+
"This response returns an "
44+
"'Internal Server Error' error message"
45+
),
46+
"schema": SimpleStatusCodeResponseModel,
47+
},
48+
"503": {
49+
"description": (
50+
"This response returns an 'Connection Error' error message"
51+
),
52+
"schema": SimpleStatusCodeResponseModel,
53+
},
54+
},
55+
}

src/actinia_ogc_api_processes_plugin/core/actinia_common.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,14 @@ def calculate_finished(data: dict):
115115
return finished_dt.replace(microsecond=0).isoformat()
116116

117117

118+
def parse_actinia_job_id(data):
119+
"""Parse actinia job response json to extract job_id."""
120+
job_id = data.get("resource_id")
121+
if job_id and job_id.startswith("resource_id-"):
122+
job_id = job_id.removeprefix("resource_id-")
123+
return job_id
124+
125+
118126
def parse_actinia_job(job_id, data):
119127
"""Parse actinia job response json into status_info dict."""
120128
status_info = {}
@@ -176,3 +184,23 @@ def parse_actinia_job(job_id, data):
176184
status_info["links"] = links
177185

178186
return status_info
187+
188+
189+
def safe_parse_actinia_job(data):
190+
"""Return (job_id, status_info) or (None, None) for invalid items."""
191+
if not isinstance(data, dict):
192+
return None, None
193+
job_id = parse_actinia_job_id(data)
194+
if not job_id:
195+
return None, None
196+
try:
197+
status_info = parse_actinia_job(job_id, data)
198+
except (TypeError, ValueError):
199+
status_info = {
200+
"jobID": job_id,
201+
"type": "process",
202+
"processID": data.get("resource_id"),
203+
"status": data.get("status"),
204+
"links": [],
205+
}
206+
return job_id, status_info

src/actinia_ogc_api_processes_plugin/core/job_list.py

Lines changed: 2 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
from requests.auth import HTTPBasicAuth
1919

2020
from actinia_ogc_api_processes_plugin.core.actinia_common import (
21-
parse_actinia_job,
21+
safe_parse_actinia_job,
2222
)
2323
from actinia_ogc_api_processes_plugin.resources.config import ACTINIA
2424
from actinia_ogc_api_processes_plugin.resources.logging import log
@@ -62,26 +62,6 @@ def _generate_new_joblinks(job_id: str) -> list[dict]:
6262
return [{"href": job_href, "rel": "status"}]
6363

6464

65-
def _safe_parse_item(item):
66-
"""Return (job_id, status_info) or (None, None) for invalid items."""
67-
if not isinstance(item, dict):
68-
return None, None
69-
job_id = item.get("resource_id").removeprefix("resource_id-")
70-
if not job_id:
71-
return None, None
72-
try:
73-
status_info = parse_actinia_job(job_id, item)
74-
except (TypeError, ValueError):
75-
status_info = {
76-
"jobID": job_id,
77-
"type": "process",
78-
"processID": item.get("resource_id"),
79-
"status": item.get("status"),
80-
"links": [],
81-
}
82-
return job_id, status_info
83-
84-
8565
def _get_datetime_interval(datetime_param):
8666
"""Parse datetime query parameter into (start, end) datetime tuple.
8767
@@ -269,7 +249,7 @@ def parse_actinia_jobs(
269249
jobs = []
270250

271251
for item in items:
272-
job_id, status_info = _safe_parse_item(item)
252+
job_id, status_info = safe_parse_actinia_job(item)
273253
if not job_id:
274254
continue
275255

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
#!/usr/bin/env python
2+
"""SPDX-FileCopyrightText: (c) 2026 by mundialis GmbH & Co. KG.
3+
4+
SPDX-License-Identifier: GPL-3.0-or-later
5+
6+
Process Execution core functionality
7+
"""
8+
9+
__license__ = "GPL-3.0-or-later"
10+
__author__ = "Carmen Tawalika, Lina Krisztian"
11+
__copyright__ = "Copyright 2026 mundialis GmbH & Co. KG"
12+
__maintainer__ = "mundialis GmbH & Co. KG"
13+
14+
15+
import requests
16+
from flask import has_request_context, request
17+
from requests.auth import HTTPBasicAuth
18+
19+
from actinia_ogc_api_processes_plugin.resources.config import ACTINIA
20+
21+
22+
def generate_new_joblinks(job_id: str) -> list[dict]:
23+
"""Make sure job_id is in the link."""
24+
base = request.url_root.rstrip("/") if has_request_context() else "/"
25+
job_href = f"{base}/jobs/{job_id}"
26+
return [{"href": job_href, "rel": "status"}]
27+
28+
29+
def _transform_to_actinia_process_chain(
30+
process_id: str,
31+
execute_request: dict,
32+
) -> dict:
33+
"""Transform execute postbody to actinia process chain format."""
34+
inputs = execute_request.get("inputs", [])
35+
inputs_array = [
36+
{"param": key, "value": value} for key, value in inputs.items()
37+
]
38+
39+
return {
40+
"list": [
41+
{
42+
"id": f"{process_id}_1",
43+
"module": process_id,
44+
"inputs": inputs_array,
45+
},
46+
],
47+
"version": "1",
48+
}
49+
50+
51+
def post_process_execution(
52+
process_id: str | None = None,
53+
postbody: str | None = None,
54+
):
55+
"""Start job for given process_id."""
56+
# Authentication for actinia
57+
auth = request.authorization
58+
kwargs = dict()
59+
kwargs["auth"] = HTTPBasicAuth(auth.username, auth.password)
60+
61+
project_name = ACTINIA.default_project
62+
resp = requests.get(
63+
f"{ACTINIA.processing_base_url}/actinia_modules/{process_id}",
64+
**kwargs,
65+
)
66+
if resp.status_code != 200:
67+
return resp
68+
69+
# TODO: Do we stick to the convention introduced in
70+
# https://github.com/actinia-org/actinia-module-plugin/pull/64
71+
# that the project name must be set in the template?
72+
# Or do we add it as a parameter in the process description?
73+
# If in template, a string would make more sense instead of listing
74+
# all projects which with the module was tested.
75+
if len(resp.json().get("projects")) > 1:
76+
project_name = resp.json().get("projects")[0]
77+
78+
pc = _transform_to_actinia_process_chain(process_id, postbody)
79+
kwargs["json"] = pc
80+
81+
# Start process via actinia-module-plugin
82+
url_process_execution = (
83+
f"{ACTINIA.processing_base_url}/projects/{project_name}/"
84+
"processing_export"
85+
)
86+
87+
return requests.post(
88+
url_process_execution,
89+
**kwargs,
90+
)

src/actinia_ogc_api_processes_plugin/endpoints.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@
1919
from actinia_ogc_api_processes_plugin.api.process_description import (
2020
ProcessDescription,
2121
)
22+
from actinia_ogc_api_processes_plugin.api.process_execution import (
23+
ProcessExecution,
24+
)
2225
from actinia_ogc_api_processes_plugin.api.process_list import ProcessList
2326

2427

@@ -40,3 +43,7 @@ def api_endpoint():
4043
apidoc.add_resource(JobStatusInfo, "/jobs/<string:job_id>")
4144
apidoc.add_resource(ProcessList, "/processes")
4245
apidoc.add_resource(ProcessDescription, "/processes/<string:process_id>")
46+
apidoc.add_resource(
47+
ProcessExecution,
48+
"/processes/<string:process_id>/execution",
49+
)

src/actinia_ogc_api_processes_plugin/resources/config.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,9 @@
2828
class ACTINIA:
2929
"""Default config for actinia processing."""
3030

31-
processing_base_url = "http://localhost:3000/"
31+
processing_base_url = "http://localhost:8088/"
3232
use_actinia_modules = True
33+
default_project = "nc_spm_08"
3334

3435

3536
class LOGCONFIG:
@@ -77,6 +78,11 @@ def __init__(self) -> None:
7778
"ACTINIA",
7879
"use_actinia_modules",
7980
)
81+
if config.has_option("ACTINIA", "default_project"):
82+
ACTINIA.default_project = config.get(
83+
"ACTINIA",
84+
"default_project",
85+
)
8086

8187
# LOGGING
8288
if config.has_section("LOGCONFIG"):

0 commit comments

Comments
 (0)