Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions services/ui_backend_service/data/db/tables/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,12 @@ def select_columns(self):
THEN 'completed'
WHEN attempt.attempt_ok IS FALSE
THEN 'failed'
WHEN attempt.attempt_ok IS NULL
AND attempt.task_ok_location IS NULL
AND {table_name}.last_heartbeat_ts IS NOT NULL
AND @(extract(epoch from now())-{table_name}.last_heartbeat_ts)>{heartbeat_threshold}
AND {finished_at_column} IS NULL
THEN 'killed'
WHEN COALESCE(attempt.attempt_finished_at, attempt.task_ok_finished_at) IS NOT NULL
AND attempt_ok IS NULL
THEN 'unknown'
Expand Down
11 changes: 5 additions & 6 deletions services/ui_backend_service/data/refiner/task_refiner.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@
class TaskRefiner(Refinery):
"""
Refiner class for postprocessing Task rows.

Uses Metaflow Client API to refine Task's actual status from Metaflow Service and Datastore.

Parameters
-----------
cache : AsyncCacheClient
Expand Down Expand Up @@ -36,11 +34,12 @@ async def refine_record(self, record, values):
record['status'] = 'failed'
elif value is True:
record['status'] = 'completed'

# If task is failed and _task_ok key exists in values but is None,
# the process was killed externally (SIGKILL bypasses finally in task.py)
elif record['status'] == 'failed' and '_task_ok' in values and values['_task_ok'] is None:
record['status'] = 'killed'
if values.get('_foreach_stack'):
value = values['_foreach_stack']
if len(value) > 0 and len(value[0]) >= 4:
# The third one in the tuple is the foreach index. We access this way for backwards compatibility.
record['foreach_label'] = "{}[{}]".format(record['task_id'], value[0][3])

return record
return record