Skip to content

Commit 97973df

Browse files
authored
feature: add task filtering based on metadata (#449)
1 parent 93a5683 commit 97973df

File tree

4 files changed

+210
-1
lines changed

4 files changed

+210
-1
lines changed

services/data/postgres_async_db.py

+51
Original file line numberDiff line numberDiff line change
@@ -814,6 +814,57 @@ async def get_metadata(
814814
}
815815
return await self.get_records(filter_dict=filter_dict)
816816

817+
async def get_filtered_task_pathspecs(self, flow_id: str, run_id: str, step_name: str, field_name: str, pattern: str):
818+
"""
819+
Returns a list of task pathspecs that match the given field_name and regexp pattern for the value
820+
"""
821+
run_id_key, run_id_value = translate_run_key(run_id)
822+
filter_dict = {
823+
"flow_id": flow_id,
824+
run_id_key: run_id_value,
825+
"step_name": step_name,
826+
}
827+
conditions = [f"{k} = %s" for k, v in filter_dict.items() if v is not None]
828+
values = [v for k, v in filter_dict.items() if v is not None]
829+
830+
if field_name:
831+
conditions.append("field_name = %s")
832+
values.append(field_name)
833+
834+
if pattern:
835+
conditions.append("regexp_match(value, %s) IS NOT NULL")
836+
values.append(pattern)
837+
838+
# We must return distinct task pathspecs, so we construct the select statement by hand
839+
sql_template = """
840+
SELECT DISTINCT {select_columns} FROM (
841+
SELECT
842+
{keys}
843+
FROM {table_name}
844+
) T
845+
{where}
846+
{order_by}
847+
"""
848+
849+
select_sql = sql_template.format(
850+
keys=",".join(self.select_columns),
851+
table_name=self.table_name,
852+
where="WHERE {}".format(" AND ".join(conditions)),
853+
order_by="ORDER BY task_id",
854+
select_columns=",".join(["flow_id, run_number, run_id, step_name, task_name, task_id"])
855+
).strip()
856+
857+
db_response, pagination = await self.execute_sql(select_sql=select_sql, values=values, serialize=False)
858+
859+
# flatten the ids in the response
860+
def _format_id(row):
861+
flow_id, run_number, run_id, step_name, task_name, task_id = row
862+
# pathspec
863+
return f"{flow_id}/{run_id or run_number}/{step_name}/{task_name or task_id}"
864+
865+
flattened_response = DBResponse(body=[_format_id(row) for row in db_response.body], response_code=db_response.response_code)
866+
return flattened_response, pagination
867+
817868

818869
class AsyncArtifactTablePostgres(AsyncPostgresTable):
819870
artifact_dict = {}

services/metadata_service/api/task.py

+57
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,11 @@ def __init__(self, app):
1919
"/flows/{flow_id}/runs/{run_number}/steps/{step_name}/tasks",
2020
self.get_tasks,
2121
)
22+
app.router.add_route(
23+
"GET",
24+
"/flows/{flow_id}/runs/{run_number}/steps/{step_name}/filtered_tasks",
25+
self.get_filtered_tasks,
26+
)
2227
app.router.add_route(
2328
"GET",
2429
"/flows/{flow_id}/runs/{run_number}/steps/{step_name}/tasks/{task_id}",
@@ -34,6 +39,7 @@ def __init__(self, app):
3439
self.tasks_heartbeat)
3540
self._async_table = AsyncPostgresDB.get_instance().task_table_postgres
3641
self._async_run_table = AsyncPostgresDB.get_instance().run_table_postgres
42+
self._async_metadata_table = AsyncPostgresDB.get_instance().metadata_table_postgres
3743
self._db = AsyncPostgresDB.get_instance()
3844

3945
@format_response
@@ -76,6 +82,57 @@ async def get_tasks(self, request):
7682
db_response = await apply_run_tags_to_db_response(flow_id, run_number, self._async_run_table, db_response)
7783
return db_response
7884

85+
@format_response
86+
@handle_exceptions
87+
async def get_filtered_tasks(self, request):
88+
"""
89+
---
90+
description: get all task ids that match the provided metadata field name and/or value.
91+
tags:
92+
- Tasks
93+
parameters:
94+
- name: "flow_id"
95+
in: "path"
96+
description: "flow_id"
97+
required: true
98+
type: "string"
99+
- name: "run_number"
100+
in: "path"
101+
description: "run_number"
102+
required: true
103+
type: "string"
104+
- name: "step_name"
105+
in: "path"
106+
description: "step_name"
107+
required: true
108+
type: "string"
109+
- name: "metadata_field_name"
110+
in: "query"
111+
description: "Metadata field name to filter with"
112+
type: "string"
113+
- name: "pattern"
114+
in: "query"
115+
description: "A regexp pattern to filter the metadata values on"
116+
type: "string"
117+
produces:
118+
- text/plain
119+
responses:
120+
"200":
121+
description: successful operation. Return tasks
122+
"405":
123+
description: invalid HTTP Method
124+
"""
125+
flow_id = request.match_info.get("flow_id")
126+
run_number = request.match_info.get("run_number")
127+
step_name = request.match_info.get("step_name")
128+
129+
# possible filters
130+
metadata_field = request.query.get("metadata_field_name", None)
131+
pattern = request.query.get("pattern", None)
132+
133+
db_response, _ = await self._async_metadata_table.get_filtered_task_pathspecs(flow_id, run_number, step_name, metadata_field, pattern)
134+
return db_response
135+
79136
@format_response
80137
@handle_exceptions
81138
async def get_task(self, request):

services/metadata_service/api/utils.py

+2
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ def handle_exceptions(func):
5959
async def wrapper(*args, **kwargs):
6060
try:
6161
return await func(*args, **kwargs)
62+
except web.HTTPClientError as ex:
63+
return ServiceResponse(ex.status_code, ex.reason)
6264
except Exception as err:
6365
return http_500(str(err))
6466

services/metadata_service/tests/integration_tests/task_test.py

+100-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from .utils import (
44
cli, db,
55
assert_api_get_response, assert_api_post_response, compare_partial,
6-
add_flow, add_run, add_step, add_task, update_objects_with_run_tags
6+
add_flow, add_run, add_step, add_task, add_metadata, update_objects_with_run_tags
77
)
88
import pytest
99

@@ -185,6 +185,99 @@ async def test_tasks_get(cli, db):
185185
# getting tasks for non-existent step should return empty list
186186
await assert_api_get_response(cli, "/flows/{flow_id}/runs/{run_number}/steps/nonexistent/tasks".format(**_first_task), status=200, data=[])
187187

188+
async def test_filtered_tasks_get(cli, db):
189+
# create a flow, run and step for the test
190+
_flow = (await add_flow(db, "TestFlow", "test_user-1", ["a_tag", "b_tag"], ["runtime:test"])).body
191+
_run = (await add_run(db, flow_id=_flow["flow_id"])).body
192+
_step = (await add_step(db, flow_id=_run["flow_id"], run_number=_run["run_number"], step_name="first_step")).body
193+
194+
# add tasks to the step
195+
_first_task = (await add_task(db, flow_id=_step["flow_id"], run_number=_step["run_number"], step_name=_step["step_name"])).body
196+
_second_task = (await add_task(db, flow_id=_step["flow_id"], run_number=_step["run_number"], step_name=_step["step_name"])).body
197+
_third_task = (await add_task(db, flow_id=_step["flow_id"], run_number=_step["run_number"], step_name=_step["step_name"])).body
198+
199+
# add metadata to filter on
200+
(await add_metadata(db, flow_id=_first_task["flow_id"], run_number=_first_task["run_number"], step_name=_first_task["step_name"], task_id=_first_task["task_id"], metadata={"field_name":"field_a", "value": "value_a"}))
201+
(await add_metadata(db, flow_id=_first_task["flow_id"], run_number=_first_task["run_number"], step_name=_first_task["step_name"], task_id=_first_task["task_id"], metadata={"field_name":"field_b", "value": "value_b"}))
202+
203+
(await add_metadata(db, flow_id=_second_task["flow_id"], run_number=_second_task["run_number"], step_name=_second_task["step_name"], task_id=_second_task["task_id"], metadata={"field_name": "field_a", "value": "not_value_a"}))
204+
(await add_metadata(db, flow_id=_second_task["flow_id"], run_number=_second_task["run_number"], step_name=_second_task["step_name"], task_id=_second_task["task_id"], metadata={"field_name": "field_b", "value": "value_b"}))
205+
206+
# filtering with a shared key should return all relevant tasks
207+
await assert_api_get_response(cli, "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/filtered_tasks?metadata_field_name=field_a".format(**_first_task),
208+
data=[task_pathspec(_first_task), task_pathspec(_second_task)])
209+
210+
# filtering with a shared value should return all relevant tasks
211+
await assert_api_get_response(cli, "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/filtered_tasks?pattern=value_b".format(**_first_task),
212+
data=[task_pathspec(_first_task), task_pathspec(_second_task)])
213+
214+
# filtering with a regexp should return all relevant tasks
215+
await assert_api_get_response(cli, "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/filtered_tasks?pattern=value_.*".format(**_first_task),
216+
data=[task_pathspec(_first_task), task_pathspec(_second_task)])
217+
218+
# filtering with a shared key&value should return all relevant tasks
219+
await assert_api_get_response(cli, "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/filtered_tasks?metadata_field_name=field_b&pattern=value_b".format(**_first_task),
220+
data=[task_pathspec(_first_task), task_pathspec(_second_task)])
221+
222+
# filtering with a shared value should return all relevant tasks
223+
await assert_api_get_response(cli, "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/filtered_tasks?metadata_field_name=field_a&pattern=not_value_a".format(**_first_task),
224+
data=[task_pathspec(_second_task)])
225+
226+
# filtering with a mixed key&value should not return results
227+
await assert_api_get_response(cli, "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/filtered_tasks?metadata_field_name=field_a&pattern=value_b".format(**_first_task),
228+
data=[])
229+
230+
# not providing filters should return all
231+
await assert_api_get_response(cli, "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/filtered_tasks".format(**_first_task), data=[task_pathspec(_first_task), task_pathspec(_second_task)])
232+
233+
234+
async def test_filtered_tasks_mixed_ids_get(cli, db):
235+
# create a flow, run and step for the test
236+
_flow = (await add_flow(db, "TestFlow", "test_user-1", ["a_tag", "b_tag"], ["runtime:test"])).body
237+
_run = (await add_run(db, flow_id=_flow["flow_id"])).body
238+
_step = (await add_step(db, flow_id=_run["flow_id"], run_number=_run["run_number"], step_name="first_step")).body
239+
240+
# add tasks to the step
241+
_first_task = (await add_task(db, flow_id=_step["flow_id"], run_number=_step["run_number"], step_name=_step["step_name"], task_name="first-task-1")).body
242+
# we need to refetch the task as the return does not contain the internal task ID we need for further record creation.
243+
_first_task = (await db.task_table_postgres.get_task(flow_id=_step["flow_id"], run_id=_step["run_number"], step_name=_step["step_name"], task_id="first-task-1", expanded=True)).body
244+
_second_task = (await add_task(db, flow_id=_step["flow_id"], run_number=_step["run_number"], step_name=_step["step_name"])).body
245+
246+
# add metadata to filter on
247+
(await add_metadata(db, flow_id=_first_task["flow_id"], run_number=_first_task["run_number"], step_name=_first_task["step_name"], task_id=_first_task['task_id'], task_name=_first_task["task_name"], metadata={"field_name":"field_a", "value": "value_a"}))
248+
(await add_metadata(db, flow_id=_first_task["flow_id"], run_number=_first_task["run_number"], step_name=_first_task["step_name"], task_id=_first_task['task_id'], task_name=_first_task["task_name"], metadata={"field_name":"field_b", "value": "value_b"}))
249+
250+
(await add_metadata(db, flow_id=_second_task["flow_id"], run_number=_second_task["run_number"], step_name=_second_task["step_name"], task_id=_second_task["task_id"], metadata={"field_name": "field_a", "value": "not_value_a"}))
251+
(await add_metadata(db, flow_id=_second_task["flow_id"], run_number=_second_task["run_number"], step_name=_second_task["step_name"], task_id=_second_task["task_id"], metadata={"field_name": "field_b", "value": "value_b"}))
252+
253+
# filtering with a shared key should return all relevant tasks
254+
await assert_api_get_response(cli, "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/filtered_tasks?metadata_field_name=field_a".format(**_first_task),
255+
data=[task_pathspec(_first_task), task_pathspec(_second_task)])
256+
257+
# filtering with a shared value should return all relevant tasks
258+
await assert_api_get_response(cli, "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/filtered_tasks?pattern=value_b".format(**_first_task),
259+
data=[task_pathspec(_first_task), task_pathspec(_second_task)])
260+
261+
# # filtering with a regexp should return all relevant tasks
262+
await assert_api_get_response(cli, "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/filtered_tasks?pattern=value_.*".format(**_first_task),
263+
data=[task_pathspec(_first_task), task_pathspec(_second_task)])
264+
265+
# filtering with a shared key&value should return all relevant tasks
266+
await assert_api_get_response(cli, "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/filtered_tasks?metadata_field_name=field_b&pattern=value_b".format(**_first_task),
267+
data=[task_pathspec(_first_task), task_pathspec(_second_task)])
268+
269+
# filtering with a shared value should return all relevant tasks
270+
await assert_api_get_response(cli, "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/filtered_tasks?metadata_field_name=field_a&pattern=not_value_a".format(**_first_task),
271+
data=[task_pathspec(_second_task)])
272+
273+
# filtering with a mixed key&value should not return results
274+
await assert_api_get_response(cli, "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/filtered_tasks?metadata_field_name=field_a&pattern=value_b".format(**_first_task),
275+
data=[])
276+
277+
# not providing filters should return all
278+
await assert_api_get_response(cli, "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/filtered_tasks".format(**_first_task), data=[task_pathspec(_first_task), task_pathspec(_second_task)])
279+
280+
188281

189282
async def test_task_get(cli, db):
190283
# create flow, run and step for test
@@ -206,3 +299,9 @@ async def test_task_get(cli, db):
206299
await assert_api_get_response(cli, "/flows/{flow_id}/runs/1234/steps/{step_name}/tasks/{task_id}".format(**_task), status=404)
207300
await assert_api_get_response(cli, "/flows/{flow_id}/runs/{run_number}/steps/nonexistent_step/tasks/{task_id}".format(**_task), status=404)
208301
await assert_api_get_response(cli, "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/tasks/1234".format(**_task), status=404)
302+
303+
304+
# Helpers
305+
306+
def task_pathspec(task_dict):
307+
return f"{task_dict['flow_id']}/{task_dict['run_number']}/{task_dict['step_name']}/{task_dict.get('task_name', task_dict['task_id'])}"

0 commit comments

Comments
 (0)